stock_picking_batch/models/stock_picking.py

255 lines
12 KiB
Python
Raw Permalink Normal View History

# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
from odoo import _, api, Command, fields, models
from odoo.osv import expression
from odoo.exceptions import ValidationError
class StockPickingType(models.Model):
_inherit = "stock.picking.type"
count_picking_batch = fields.Integer(compute='_compute_picking_count')
count_picking_wave = fields.Integer(compute='_compute_picking_count')
auto_batch = fields.Boolean('Automatic Batches',
help="Automatically put pickings into batches as they are confirmed when possible.")
batch_group_by_partner = fields.Boolean('Contact', help="Automatically group batches by contacts.")
batch_group_by_destination = fields.Boolean('Destination Country', help="Automatically group batches by destination country.")
batch_group_by_src_loc = fields.Boolean('Source Location',
help="Automatically group batches by their source location.")
batch_group_by_dest_loc = fields.Boolean('Destination Location',
help="Automatically group batches by their destination location.")
batch_max_lines = fields.Integer("Maximum lines per batch",
help="A transfer will not be automatically added to batches that will exceed this number of lines if the transfer is added to it.\n"
"Leave this value as '0' if no line limit.")
batch_max_pickings = fields.Integer("Maximum transfers per batch",
help="A transfer will not be automatically added to batches that will exceed this number of transfers.\n"
"Leave this value as '0' if no transfer limit.")
batch_auto_confirm = fields.Boolean("Auto-confirm", default=True)
def _compute_picking_count(self):
super()._compute_picking_count()
data = self.env['stock.picking.batch']._read_group(
[('state', 'not in', ('done', 'cancel')), ('picking_type_id', 'in', self.ids)],
['picking_type_id', 'is_wave'], ['__count'])
count = {(picking_type.id, is_wave): count for picking_type, is_wave, count in data}
for record in self:
record.count_picking_wave = count.get((record.id, True), 0)
record.count_picking_batch = count.get((record.id, False), 0)
@api.model
def _get_batch_group_by_keys(self):
return ['batch_group_by_partner', 'batch_group_by_destination', 'batch_group_by_src_loc', 'batch_group_by_dest_loc']
@api.constrains(lambda self: self._get_batch_group_by_keys() + ['auto_batch'])
def _validate_auto_batch_group_by(self):
group_by_keys = self._get_batch_group_by_keys()
for picking_type in self:
if not picking_type.auto_batch:
continue
if not any(picking_type[key] for key in group_by_keys):
raise ValidationError(_("If the Automatic Batches feature is enabled, at least one 'Group by' option must be selected."))
def get_action_picking_tree_batch(self):
return self._get_action('stock_picking_batch.stock_picking_batch_action')
def get_action_picking_tree_wave(self):
return self._get_action('stock_picking_batch.action_picking_tree_wave')
class StockPicking(models.Model):
_inherit = "stock.picking"
batch_id = fields.Many2one(
'stock.picking.batch', string='Batch Transfer',
check_company=True,
help='Batch associated to this transfer', index=True, copy=False)
@api.model_create_multi
def create(self, vals_list):
pickings = super().create(vals_list)
for picking, vals in zip(pickings, vals_list):
if vals.get('batch_id'):
if not picking.batch_id.picking_type_id:
picking.batch_id.picking_type_id = picking.picking_type_id[0]
picking.batch_id._sanity_check()
return pickings
def write(self, vals):
old_batches = self.batch_id
res = super().write(vals)
if vals.get('batch_id'):
old_batches.filtered(lambda b: not b.picking_ids).state = 'cancel'
if not self.batch_id.picking_type_id:
self.batch_id.picking_type_id = self.picking_type_id[0]
self.batch_id._sanity_check()
# assign batch users to batch pickings
self.batch_id.picking_ids.assign_batch_user(self.batch_id.user_id.id)
return res
def action_add_operations(self):
view = self.env.ref('stock_picking_batch.view_move_line_tree_detailed_wave')
return {
'name': _('Add Operations'),
'type': 'ir.actions.act_window',
'view_mode': 'list',
'view': view,
'views': [(view.id, 'tree')],
'res_model': 'stock.move.line',
'target': 'new',
'domain': [
('picking_id', 'in', self.ids),
('state', '!=', 'done')
],
'context': dict(
self.env.context,
picking_to_wave=self.ids,
active_wave_id=self.env.context.get('active_wave_id').id,
search_default_by_location=True,
)}
def action_confirm(self):
res = super().action_confirm()
for picking in self:
picking._find_auto_batch()
return res
def button_validate(self):
res = super().button_validate()
to_assign_ids = set()
if self and self.env.context.get('pickings_to_detach'):
self.env['stock.picking'].browse(self.env.context['pickings_to_detach']).batch_id = False
to_assign_ids.update(self.env.context['pickings_to_detach'])
for picking in self:
if picking.state != 'done':
continue
# Avoid inconsistencies in states of the same batch when validating a single picking in a batch.
if picking.batch_id and any(p.state != 'done' for p in picking.batch_id.picking_ids):
picking.batch_id = None
# If backorder were made, if auto-batch is enabled, seek a batch for each of them with the selected criterias.
to_assign_ids.update(picking.backorder_ids.ids)
# To avoid inconsistencies, all incorrect pickings must be removed before assigning backorder pickings
assignable_pickings = self.env['stock.picking'].browse(to_assign_ids)
for picking in assignable_pickings:
picking._find_auto_batch()
return res
def action_cancel(self):
res = super().action_cancel()
for picking in self:
if picking.batch_id and any(picking.state != 'cancel' for picking in picking.batch_id.picking_ids):
picking.batch_id = None
return res
def _should_show_transfers(self):
if len(self.batch_id) == 1 and len(self) == (len(self.batch_id.picking_ids) - len(self.env.context.get('pickings_to_detach', []))):
return False
return super()._should_show_transfers()
def _find_auto_batch(self):
self.ensure_one()
# Check if auto_batch is enabled for this picking.
if not self.picking_type_id.auto_batch or self.batch_id or not self.move_ids or not self._is_auto_batchable():
return False
# Try to find a compatible batch to insert the picking
possible_batches = self.env['stock.picking.batch'].sudo().search(self._get_possible_batches_domain())
for batch in possible_batches:
if batch._is_picking_auto_mergeable(self):
batch.picking_ids |= self
return batch
# If no batch were found, try to find a compatible picking and put them both in a new batch.
possible_pickings = self.env['stock.picking'].search(self._get_possible_pickings_domain())
for picking in possible_pickings:
if self._is_auto_batchable(picking):
# Create new batch with both pickings
new_batch = self.env['stock.picking.batch'].sudo().create({
'picking_ids': [Command.link(self.id), Command.link(picking.id)],
'company_id': self.company_id.id if self.company_id else False,
'picking_type_id': self.picking_type_id.id,
})
if picking.picking_type_id.batch_auto_confirm:
new_batch.action_confirm()
return new_batch
# If nothing was found after those two steps, then no batch is doable given the conditions
return False
def _is_auto_batchable(self, picking=None):
""" Verifies if a picking can be put in a batch with another picking without violating auto_batch constrains.
"""
if self.state != 'assigned':
return False
res = True
if not picking:
picking = self.env['stock.picking']
if self.picking_type_id.batch_max_lines:
res = res and (len(self.move_ids) + len(picking.move_ids) <= self.picking_type_id.batch_max_lines)
if self.picking_type_id.batch_max_pickings:
# Sounds absurd. BUT if we put "batch max picking" to a value <= 1, makes sense ... Or not. Because then there is no point to batch.
res = res and self.picking_type_id.batch_max_pickings > 1
return res
def _get_possible_pickings_domain(self):
self.ensure_one()
domain = [
('id', '!=', self.id),
('company_id', '=', self.company_id.id if self.company_id else False),
('state', '=', 'assigned'),
('picking_type_id', '=', self.picking_type_id.id),
('batch_id', '=', False),
]
if self.picking_type_id.batch_group_by_partner:
domain = expression.AND([domain, [('partner_id', '=', self.partner_id.id)]])
if self.picking_type_id.batch_group_by_destination:
domain = expression.AND([domain, [('partner_id.country_id', '=', self.partner_id.country_id.id)]])
if self.picking_type_id.batch_group_by_src_loc:
domain = expression.AND([domain, [('location_id', '=', self.location_id.id)]])
if self.picking_type_id.batch_group_by_dest_loc:
domain = expression.AND([domain, [('location_dest_id', '=', self.location_dest_id.id)]])
return domain
def _get_possible_batches_domain(self):
self.ensure_one()
domain = [
('state', 'in', ('draft', 'in_progress') if self.picking_type_id.batch_auto_confirm else ('draft',)),
('picking_type_id', '=', self.picking_type_id.id),
('company_id', '=', self.company_id.id if self.company_id else False),
]
if self.picking_type_id.batch_group_by_partner:
domain = expression.AND([domain, [('picking_ids.partner_id', '=', self.partner_id.id)]])
if self.picking_type_id.batch_group_by_destination:
domain = expression.AND([domain, [('picking_ids.partner_id.country_id', '=', self.partner_id.country_id.id)]])
if self.picking_type_id.batch_group_by_src_loc:
domain = expression.AND([domain, [('picking_ids.location_id', '=', self.location_id.id)]])
if self.picking_type_id.batch_group_by_dest_loc:
domain = expression.AND([domain, [('picking_ids.location_dest_id', '=', self.location_dest_id.id)]])
return domain
def assign_batch_user(self, user_id):
if not user_id:
return
pickings = self.filtered(lambda p: p.user_id.id != user_id)
pickings.write({'user_id': user_id})
for pick in pickings:
log_message = _('Assigned to %s Responsible', pick.batch_id._get_html_link())
pick.message_post(body=log_message)
def _package_move_lines(self):
return super(StockPicking, self.batch_id.picking_ids if self.batch_id else self)._package_move_lines()
def action_view_batch(self):
self.ensure_one()
return {
'type': 'ir.actions.act_window',
'res_model': 'stock.picking.batch',
'res_id': self.batch_id.id,
'view_mode': 'form'
}