330 lines
16 KiB
Python

# -*- encoding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
from collections import defaultdict
from odoo import api, fields, models, _, Command
from odoo.tools import DEFAULT_SERVER_DATETIME_FORMAT, get_lang
class PurchaseOrderGroup(models.Model):
_name = 'purchase.order.group'
_description = "Technical model to group PO for call to tenders"
order_ids = fields.One2many('purchase.order', 'purchase_group_id')
def write(self, vals):
res = super().write(vals)
# when len(POs) == 1, only linking PO to itself at this point => self implode (delete) group
self.filtered(lambda g: len(g.order_ids) <= 1).unlink()
return res
class PurchaseOrder(models.Model):
_inherit = 'purchase.order'
requisition_id = fields.Many2one('purchase.requisition', string='Blanket Order', copy=False)
is_quantity_copy = fields.Selection(related='requisition_id.is_quantity_copy', readonly=False)
purchase_group_id = fields.Many2one('purchase.order.group')
alternative_po_ids = fields.One2many(
'purchase.order', related='purchase_group_id.order_ids', readonly=False,
domain="[('id', '!=', id), ('state', 'in', ['draft', 'sent', 'to approve'])]",
string="Alternative POs", check_company=True,
help="Other potential purchase orders for purchasing products")
has_alternatives = fields.Boolean(
"Has Alternatives", compute='_compute_has_alternatives',
help="Whether or not this purchase order is linked to another purchase order as an alternative.")
@api.depends('purchase_group_id')
def _compute_has_alternatives(self):
self.has_alternatives = False
self.filtered(lambda po: po.purchase_group_id).has_alternatives = True
@api.onchange('requisition_id')
def _onchange_requisition_id(self):
if not self.requisition_id:
return
self = self.with_company(self.company_id)
requisition = self.requisition_id
if self.partner_id:
partner = self.partner_id
else:
partner = requisition.vendor_id
payment_term = partner.property_supplier_payment_term_id
FiscalPosition = self.env['account.fiscal.position']
fpos = FiscalPosition.with_company(self.company_id)._get_fiscal_position(partner)
self.partner_id = partner.id
self.fiscal_position_id = fpos.id
self.payment_term_id = payment_term.id
self.company_id = requisition.company_id.id
self.currency_id = requisition.currency_id.id
if not self.origin or requisition.name not in self.origin.split(', '):
if self.origin:
if requisition.name:
self.origin = self.origin + ', ' + requisition.name
else:
self.origin = requisition.name
self.notes = requisition.description
self.date_order = fields.Datetime.now()
if requisition.type_id.line_copy != 'copy':
return
# Create PO lines if necessary
order_lines = []
for line in requisition.line_ids:
# Compute name
product_lang = line.product_id.with_context(
lang=partner.lang or self.env.user.lang,
partner_id=partner.id
)
name = product_lang.display_name
if product_lang.description_purchase:
name += '\n' + product_lang.description_purchase
# Compute taxes
taxes_ids = fpos.map_tax(line.product_id.supplier_taxes_id.filtered(lambda tax: tax.company_id == requisition.company_id)).ids
# Compute quantity and price_unit
if line.product_uom_id != line.product_id.uom_po_id:
product_qty = line.product_uom_id._compute_quantity(line.product_qty, line.product_id.uom_po_id)
price_unit = line.product_uom_id._compute_price(line.price_unit, line.product_id.uom_po_id)
else:
product_qty = line.product_qty
price_unit = line.price_unit
if requisition.type_id.quantity_copy != 'copy':
product_qty = 0
# Create PO line
order_line_values = line._prepare_purchase_order_line(
name=name, product_qty=product_qty, price_unit=price_unit,
taxes_ids=taxes_ids)
order_lines.append((0, 0, order_line_values))
self.order_line = order_lines
def button_confirm(self):
if self.alternative_po_ids and not self.env.context.get('skip_alternative_check', False):
alternative_po_ids = self.alternative_po_ids.filtered(lambda po: po.state in ['draft', 'sent', 'to approve'] and po.id not in self.ids)
if alternative_po_ids:
view = self.env.ref('purchase_requisition.purchase_requisition_alternative_warning_form')
return {
'name': _("What about the alternative Requests for Quotations?"),
'type': 'ir.actions.act_window',
'view_mode': 'form',
'res_model': 'purchase.requisition.alternative.warning',
'views': [(view.id, 'form')],
'target': 'new',
'context': dict(self.env.context, default_alternative_po_ids=alternative_po_ids.ids, default_po_ids=self.ids),
}
res = super(PurchaseOrder, self).button_confirm()
for po in self:
if not po.requisition_id:
continue
if po.requisition_id.type_id.exclusive == 'exclusive':
others_po = po.requisition_id.mapped('purchase_ids').filtered(lambda r: r.id != po.id)
others_po.button_cancel()
if po.state not in ['draft', 'sent', 'to approve']:
po.requisition_id.action_done()
return res
@api.model_create_multi
def create(self, vals_list):
orders = super().create(vals_list)
if self.env.context.get('origin_po_id'):
# po created as an alt to another PO:
origin_po_id = self.env['purchase.order'].browse(self.env.context.get('origin_po_id'))
if origin_po_id.purchase_group_id:
origin_po_id.purchase_group_id.order_ids |= orders
else:
self.env['purchase.order.group'].create({'order_ids': [Command.set(origin_po_id.ids + orders.ids)]})
for order in orders:
if order.requisition_id:
order.message_post_with_source(
'mail.message_origin_link',
render_values={'self': order, 'origin': order.requisition_id},
subtype_xmlid='mail.mt_note',
)
return orders
def write(self, vals):
if vals.get('purchase_group_id', False):
# store in case linking to a PO with existing linkages
orig_purchase_group = self.purchase_group_id
result = super(PurchaseOrder, self).write(vals)
if vals.get('requisition_id'):
for order in self:
order.message_post_with_source(
'mail.message_origin_link',
render_values={'self': order, 'origin': order.requisition_id, 'edit': True},
subtype_xmlid='mail.mt_note',
)
if vals.get('alternative_po_ids', False):
if not self.purchase_group_id and len(self.alternative_po_ids + self) > len(self):
# this can create a new group + delete an existing one (or more) when linking to already linked PO(s), but this is
# simplier than additional logic checking if exactly 1 exists or merging multiple groups if > 1
self.env['purchase.order.group'].create({'order_ids': [Command.set(self.ids + self.alternative_po_ids.ids)]})
elif self.purchase_group_id and len(self.alternative_po_ids + self) <= 1:
# write in purchase group isn't called so we have to manually unlink obsolete groups here
self.purchase_group_id.unlink()
if vals.get('purchase_group_id', False):
# the write is for multiple POs => don't double count the POs of the final group
additional_groups = orig_purchase_group - self.purchase_group_id
if additional_groups:
additional_pos = (additional_groups.order_ids - self.purchase_group_id.order_ids)
additional_groups.unlink()
if additional_pos:
self.purchase_group_id.order_ids |= additional_pos
return result
def action_create_alternative(self):
ctx = dict(**self.env.context, default_origin_po_id=self.id)
return {
'name': _('Create alternative'),
'type': 'ir.actions.act_window',
'view_mode': 'form',
'res_model': 'purchase.requisition.create.alternative',
'view_id': self.env.ref('purchase_requisition.purchase_requisition_create_alternative_form').id,
'target': 'new',
'context': ctx,
}
def action_compare_alternative_lines(self):
ctx = dict(
self.env.context,
search_default_groupby_product=True,
purchase_order_id=self.id,
)
view_id = self.env.ref('purchase_requisition.purchase_order_line_compare_tree').id
return {
'name': _('Compare Order Lines'),
'type': 'ir.actions.act_window',
'view_mode': 'list',
'res_model': 'purchase.order.line',
'views': [(view_id, "list")],
'domain': [('order_id', 'in', (self | self.alternative_po_ids).ids), ('display_type', '=', False)],
'context': ctx,
}
def get_tender_best_lines(self):
product_to_best_price_line = defaultdict(lambda: self.env['purchase.order.line'])
product_to_best_date_line = defaultdict(lambda: self.env['purchase.order.line'])
product_to_best_price_unit = defaultdict(lambda: self.env['purchase.order.line'])
po_alternatives = self | self.alternative_po_ids
multiple_currencies = False
if len(po_alternatives.currency_id) > 1:
multiple_currencies = True
for line in po_alternatives.order_line:
if not line.product_qty or not line.price_subtotal or line.state in ['cancel', 'purchase', 'done']:
continue
# if no best price line => no best price unit line either
if not product_to_best_price_line[line.product_id]:
product_to_best_price_line[line.product_id] = line
product_to_best_price_unit[line.product_id] = line
else:
price_subtotal = line.price_subtotal
price_unit = line.price_unit
current_price_subtotal = product_to_best_price_line[line.product_id][0].price_subtotal
current_price_unit = product_to_best_price_unit[line.product_id][0].price_unit
if multiple_currencies:
price_subtotal /= line.order_id.currency_rate
price_unit /= line.order_id.currency_rate
current_price_subtotal /= product_to_best_price_line[line.product_id][0].order_id.currency_rate
current_price_unit /= product_to_best_price_unit[line.product_id][0].order_id.currency_rate
if current_price_subtotal > price_subtotal:
product_to_best_price_line[line.product_id] = line
elif current_price_subtotal == price_subtotal:
product_to_best_price_line[line.product_id] |= line
if current_price_unit > price_unit:
product_to_best_price_unit[line.product_id] = line
elif current_price_unit == price_unit:
product_to_best_price_unit[line.product_id] |= line
if not product_to_best_date_line[line.product_id] or product_to_best_date_line[line.product_id][0].date_planned > line.date_planned:
product_to_best_date_line[line.product_id] = line
elif product_to_best_date_line[line.product_id][0].date_planned == line.date_planned:
product_to_best_date_line[line.product_id] |= line
best_price_ids = set()
best_date_ids = set()
best_price_unit_ids = set()
for lines in product_to_best_price_line.values():
best_price_ids.update(lines.ids)
for lines in product_to_best_date_line.values():
best_date_ids.update(lines.ids)
for lines in product_to_best_price_unit.values():
best_price_unit_ids.update(lines.ids)
return list(best_price_ids), list(best_date_ids), list(best_price_unit_ids)
class PurchaseOrderLine(models.Model):
_inherit = 'purchase.order.line'
def _compute_price_unit_and_date_planned_and_name(self):
po_lines_without_requisition = self.env['purchase.order.line']
for pol in self:
if pol.product_id.id not in pol.order_id.requisition_id.line_ids.product_id.ids:
po_lines_without_requisition |= pol
continue
for line in pol.order_id.requisition_id.line_ids:
if line.product_id == pol.product_id:
pol.price_unit = line.product_uom_id._compute_price(line.price_unit, pol.product_uom)
partner = pol.order_id.partner_id or pol.order_id.requisition_id.vendor_id
params = {'order_id': pol.order_id}
seller = pol.product_id._select_seller(
partner_id=partner,
quantity=pol.product_qty,
date=pol.order_id.date_order and pol.order_id.date_order.date(),
uom_id=line.product_uom_id,
params=params)
if not pol.date_planned:
pol.date_planned = pol._get_date_planned(seller).strftime(DEFAULT_SERVER_DATETIME_FORMAT)
product_ctx = {'seller_id': seller.id, 'lang': get_lang(pol.env, partner.lang).code}
name = pol._get_product_purchase_description(pol.product_id.with_context(product_ctx))
if line.product_description_variants:
name += '\n' + line.product_description_variants
pol.name = name
break
super(PurchaseOrderLine, po_lines_without_requisition)._compute_price_unit_and_date_planned_and_name()
def action_clear_quantities(self):
zeroed_lines = self.filtered(lambda l: l.state not in ['cancel', 'purchase', 'done'])
zeroed_lines.write({'product_qty': 0})
if len(self) > len(zeroed_lines):
return {
'type': 'ir.actions.client',
'tag': 'display_notification',
'params': {
'title': _("Some not cleared"),
'message': _("Some quantities were not cleared because their status is not a RFQ status."),
'sticky': False,
}
}
return False
def action_choose(self):
order_lines = (self.order_id | self.order_id.alternative_po_ids).mapped('order_line')
order_lines = order_lines.filtered(lambda l: l.product_qty and l.product_id.id in self.product_id.ids and l.id not in self.ids)
if order_lines:
return order_lines.action_clear_quantities()
return {
'type': 'ir.actions.client',
'tag': 'display_notification',
'params': {
'title': _("Nothing to clear"),
'message': _("There are no quantities to clear."),
'sticky': False,
}
}