# -*- coding: utf-8 -*- # Part of Odoo. See LICENSE file for full copyright and licensing details. from collections import defaultdict import itertools import random from odoo import api, fields, models, _ from odoo.exceptions import UserError, ValidationError from odoo.fields import Command from odoo.tools.float_utils import float_is_zero, float_round from odoo.osv import expression def _generate_random_reward_code(): return str(random.getrandbits(32)) class SaleOrder(models.Model): _inherit = 'sale.order' # Contains how much points should be given to a coupon upon validating the order applied_coupon_ids = fields.Many2many( comodel_name='loyalty.card', string="Manually Applied Coupons", copy=False) code_enabled_rule_ids = fields.Many2many( comodel_name='loyalty.rule', string="Manually Triggered Rules", copy=False) coupon_point_ids = fields.One2many( comodel_name='sale.order.coupon.points', inverse_name='order_id', copy=False) reward_amount = fields.Float(compute='_compute_reward_total') @api.depends('order_line') def _compute_reward_total(self): for order in self: reward_amount = 0 for line in order.order_line: if not line.reward_id: continue if line.reward_id.reward_type != 'product': reward_amount += line.price_subtotal else: # Free product are 'regular' product lines with a price_unit of 0 reward_amount -= line.product_id.lst_price * line.product_uom_qty order.reward_amount = reward_amount def _get_no_effect_on_threshold_lines(self): """ Returns the lines that have no effect on the minimum amount to reach """ self.ensure_one() return self.env['sale.order.line'] @api.returns('self', lambda value: value.id) def copy(self, default=None): order = super(SaleOrder, self).copy(default) reward_lines = order.order_line.filtered('is_reward_line') if reward_lines: reward_lines.unlink() return order def action_confirm(self): for order in self: all_coupons = order.applied_coupon_ids | order.coupon_point_ids.coupon_id | order.order_line.coupon_id if any(order._get_real_points_for_coupon(coupon) < 0 for coupon in all_coupons): raise ValidationError(_('One or more rewards on the sale order is invalid. Please check them.')) order._update_programs_and_rewards() # Remove any coupon from 'current' program that don't claim any reward. # This is to avoid ghost coupons that are lost forever. # Claiming a reward for that program will require either an automated check or a manual input again. reward_coupons = self.order_line.coupon_id self.coupon_point_ids.filtered( lambda pe: pe.coupon_id.program_id.applies_on == 'current' and pe.coupon_id not in reward_coupons ).coupon_id.sudo().unlink() # Add/remove the points to our coupons for coupon, change in self.filtered(lambda s: s.state != 'sale')._get_point_changes().items(): coupon.points += change res = super().action_confirm() self._send_reward_coupon_mail() return res def _action_cancel(self): previously_confirmed = self.filtered(lambda s: s.state == 'sale') res = super()._action_cancel() # Add/remove the points to our coupons for coupon, changes in previously_confirmed.filtered( lambda s: s.state != 'sale' )._get_point_changes().items(): coupon.points -= changes # Remove any rewards self.order_line.filtered(lambda l: l.is_reward_line).unlink() self.coupon_point_ids.coupon_id.sudo().filtered( lambda c: not c.program_id.is_nominative and c.order_id in self and not c.use_count)\ .unlink() self.coupon_point_ids.unlink() return res def action_open_reward_wizard(self): self.ensure_one() self._update_programs_and_rewards() claimable_rewards = self._get_claimable_rewards() if len(claimable_rewards) == 1: coupon = next(iter(claimable_rewards)) if len(claimable_rewards[coupon]) == 1: self._apply_program_reward(claimable_rewards[coupon], coupon) return True elif not claimable_rewards: return True return self.env['ir.actions.actions']._for_xml_id('sale_loyalty.sale_loyalty_reward_wizard_action') def _send_reward_coupon_mail(self): coupons = self.env['loyalty.card'] for order in self: coupons |= order._get_reward_coupons() if coupons: coupons._send_creation_communication(force_send=True) def _get_applied_global_discount_lines(self): """ Returns the first line of the currently applied global discount or False """ self.ensure_one() return self.order_line.filtered(lambda l: l.reward_id.is_global_discount) def _get_applied_global_discount(self): """ Returns the currently applied global discount reward or False """ return self._get_applied_global_discount_lines().reward_id def _get_reward_values_product(self, reward, coupon, product=None, **kwargs): """ Returns an array of dict containing the values required for the reward lines """ self.ensure_one() assert reward.reward_type == 'product' reward_products = reward.reward_product_ids product = product or reward_products[:1] if not product or not product in reward_products: raise UserError(_('Invalid product to claim.')) taxes = self.fiscal_position_id.map_tax(product.taxes_id.filtered(lambda t: t.company_id == self.company_id)) points = self._get_real_points_for_coupon(coupon) claimable_count = float_round(points / reward.required_points, precision_rounding=1, rounding_method='DOWN') if not reward.clear_wallet else 1 cost = points if reward.clear_wallet else claimable_count * reward.required_points return [{ 'name': _("Free Product - %(product)s", product=product.with_context(display_default_code=False).display_name), 'product_id': product.id, 'discount': 100, 'product_uom_qty': reward.reward_product_qty * claimable_count, 'reward_id': reward.id, 'coupon_id': coupon.id, 'points_cost': cost, 'reward_identifier_code': _generate_random_reward_code(), 'product_uom': product.uom_id.id, 'sequence': max(self.order_line.filtered(lambda x: not x.is_reward_line).mapped('sequence'), default=10) + 1, 'tax_id': [(Command.CLEAR, 0, 0)] + [(Command.LINK, tax.id, False) for tax in taxes] }] def _discountable_order(self, reward): """ Returns the discountable and discountable_per_tax for a discount that applies to the whole order """ self.ensure_one() assert reward.discount_applicability == 'order' discountable = 0 discountable_per_tax = defaultdict(int) lines = self.order_line if reward.program_id.is_payment_program else (self.order_line - self._get_no_effect_on_threshold_lines()) for line in lines: # Ignore lines from this reward if not line.product_uom_qty or not line.price_unit: continue tax_data = line._convert_to_tax_base_line_dict() # To compute the discountable amount we get the fixed tax amount and # subtract it from the order total. This way fixed taxes will not be discounted tax_data['taxes'] = tax_data['taxes'].filtered(lambda t: t.amount_type == 'fixed') tax_results = self.env['account.tax']._compute_taxes([tax_data]) totals = list(tax_results['totals'].values())[0] discountable += line.price_total - totals['amount_tax'] taxes = line.tax_id.filtered(lambda t: t.amount_type != 'fixed') discountable_per_tax[taxes] += totals['amount_untaxed'] return discountable, discountable_per_tax def _cheapest_line(self): self.ensure_one() cheapest_line = False for line in (self.order_line - self._get_no_effect_on_threshold_lines()): if line.reward_id or not line.product_uom_qty or not line.price_unit: continue if not cheapest_line or cheapest_line.price_unit > line.price_unit: cheapest_line = line return cheapest_line def _discountable_cheapest(self, reward): """ Returns the discountable and discountable_per_tax for a discount that applies to the cheapest line """ self.ensure_one() assert reward.discount_applicability == 'cheapest' cheapest_line = self._cheapest_line() discountable = cheapest_line.price_unit * (1 - (cheapest_line.discount or 0) / 100) taxes = cheapest_line.tax_id.filtered(lambda t: t.amount_type != 'fixed') return discountable, {taxes: discountable} def _get_specific_discountable_lines(self, reward): """ Returns all lines to which `reward` can apply """ self.ensure_one() assert reward.discount_applicability == 'specific' discountable_lines = self.env['sale.order.line'] for line in (self.order_line - self._get_no_effect_on_threshold_lines()): domain = reward._get_discount_product_domain() if not line.reward_id and line.product_id.filtered_domain(domain): discountable_lines |= line return discountable_lines def _discountable_specific(self, reward): """ Special function to compute the discountable for 'specific' types of discount. The goal of this function is to make sure that applying a 5$ discount on an order with a 5$ product and a 5% discount does not make the order go below 0. Returns the discountable and discountable_per_tax for a discount that only applies to specific products. """ self.ensure_one() assert reward.discount_applicability == 'specific' lines_to_discount = self.env['sale.order.line'] discount_lines = defaultdict(lambda: self.env['sale.order.line']) order_lines = self.order_line - self._get_no_effect_on_threshold_lines() remaining_amount_per_line = defaultdict(int) for line in order_lines: if not line.product_uom_qty or not line.price_total: continue remaining_amount_per_line[line] = line.price_total domain = reward._get_discount_product_domain() if not line.reward_id and line.product_id.filtered_domain(domain): lines_to_discount |= line elif line.reward_id.reward_type == 'discount': discount_lines[line.reward_identifier_code] |= line order_lines -= self.order_line.filtered("reward_id") cheapest_line = False for lines in discount_lines.values(): line_reward = lines.reward_id discounted_lines = order_lines if line_reward.discount_applicability == 'cheapest': cheapest_line = cheapest_line or self._cheapest_line() discounted_lines = cheapest_line elif line_reward.discount_applicability == 'specific': discounted_lines = self._get_specific_discountable_lines(line_reward) if not discounted_lines: continue common_lines = discounted_lines & lines_to_discount if line_reward.discount_mode == 'percent': for line in discounted_lines: if line_reward.discount_applicability == 'cheapest': remaining_amount_per_line[line] *= (1 - line_reward.discount / 100 / line.product_uom_qty) else: remaining_amount_per_line[line] *= (1 - line_reward.discount / 100) else: non_common_lines = discounted_lines - lines_to_discount # Fixed prices are per tax discounted_amounts = {line.tax_id.filtered(lambda t: t.amount_type != 'fixed'): abs(line.price_total) for line in lines} for line in itertools.chain(non_common_lines, common_lines): # For gift card and eWallet programs we have no tax but we can consume the amount completely if lines.reward_id.program_id.is_payment_program: discounted_amount = discounted_amounts[lines.tax_id.filtered(lambda t: t.amount_type != 'fixed')] else: discounted_amount = discounted_amounts[line.tax_id.filtered(lambda t: t.amount_type != 'fixed')] if discounted_amount == 0: continue remaining = remaining_amount_per_line[line] consumed = min(remaining, discounted_amount) if lines.reward_id.program_id.is_payment_program: discounted_amounts[lines.tax_id.filtered(lambda t: t.amount_type != 'fixed')] -= consumed else: discounted_amounts[line.tax_id.filtered(lambda t: t.amount_type != 'fixed')] -= consumed remaining_amount_per_line[line] -= consumed discountable = 0 discountable_per_tax = defaultdict(int) for line in lines_to_discount: discountable += remaining_amount_per_line[line] line_discountable = line.price_unit * line.product_uom_qty * (1 - (line.discount or 0.0) / 100.0) # line_discountable is the same as in a 'order' discount # but first multiplied by a factor for the taxes to apply # and then multiplied by another factor coming from the discountable taxes = line.tax_id.filtered(lambda t: t.amount_type != 'fixed') discountable_per_tax[taxes] += line_discountable *\ (remaining_amount_per_line[line] / line.price_total) return discountable, discountable_per_tax def _get_reward_values_discount(self, reward, coupon, **kwargs): self.ensure_one() assert reward.reward_type == 'discount' # Figure out which lines are concerned by the discount # cheapest_line = self.env['sale.order.line'] discountable = 0 discountable_per_tax = defaultdict(int) reward_applies_on = reward.discount_applicability sequence = max(self.order_line.filtered(lambda x: not x.is_reward_line).mapped('sequence'), default=10) + 1 if reward_applies_on == 'order': discountable, discountable_per_tax = self._discountable_order(reward) elif reward_applies_on == 'specific': discountable, discountable_per_tax = self._discountable_specific(reward) elif reward_applies_on == 'cheapest': discountable, discountable_per_tax = self._discountable_cheapest(reward) if not discountable: if not reward.program_id.is_payment_program and any(line.reward_id.program_id.is_payment_program for line in self.order_line): return [{ 'name': _("TEMPORARY DISCOUNT LINE"), 'product_id': reward.discount_line_product_id.id, 'price_unit': 0, 'product_uom_qty': 0, 'product_uom': reward.discount_line_product_id.uom_id.id, 'reward_id': reward.id, 'coupon_id': coupon.id, 'points_cost': 0, 'reward_identifier_code': _generate_random_reward_code(), 'sequence': sequence, 'tax_id': [(Command.CLEAR, 0, 0)] }] raise UserError(_('There is nothing to discount')) max_discount = reward.currency_id._convert(reward.discount_max_amount, self.currency_id, self.company_id, fields.Date.today()) or float('inf') # discount should never surpass the order's current total amount max_discount = min(self.amount_total, max_discount) if reward.discount_mode == 'per_point': points = self._get_real_points_for_coupon(coupon) if not reward.program_id.is_payment_program: # Rewards cannot be partially offered to customers points = points // reward.required_points * reward.required_points max_discount = min(max_discount, reward.currency_id._convert(reward.discount * points, self.currency_id, self.company_id, fields.Date.today())) elif reward.discount_mode == 'per_order': max_discount = min(max_discount, reward.currency_id._convert(reward.discount, self.currency_id, self.company_id, fields.Date.today())) elif reward.discount_mode == 'percent': max_discount = min(max_discount, discountable * (reward.discount / 100)) # Discount per taxes reward_code = _generate_random_reward_code() point_cost = reward.required_points if not reward.clear_wallet else self._get_real_points_for_coupon(coupon) if reward.discount_mode == 'per_point' and not reward.clear_wallet: # Calculate the actual point cost if the cost is per point converted_discount = self.currency_id._convert(min(max_discount, discountable), reward.currency_id, self.company_id, fields.Date.today()) point_cost = converted_discount / reward.discount # Gift cards and eWallets are considered gift cards and should not have any taxes if reward.program_id.is_payment_program: return [{ 'name': reward.description, 'product_id': reward.discount_line_product_id.id, 'price_unit': -min(max_discount, discountable), 'product_uom_qty': 1.0, 'product_uom': reward.discount_line_product_id.uom_id.id, 'reward_id': reward.id, 'coupon_id': coupon.id, 'points_cost': point_cost, 'reward_identifier_code': reward_code, 'sequence': sequence, 'tax_id': [(Command.CLEAR, 0, 0)], }] discount_factor = min(1, (max_discount / discountable)) if discountable else 1 mapped_taxes = {tax: self.fiscal_position_id.map_tax(tax) for tax in discountable_per_tax} reward_dict = {tax: { 'name': _( 'Discount: %(desc)s%(tax_str)s', desc=reward.description, tax_str=len(discountable_per_tax) and any(t.name for t in mapped_taxes[tax]) and _(' - On product with the following taxes: %(taxes)s', taxes=", ".join(mapped_taxes[tax].mapped('name'))) or '', ), 'product_id': reward.discount_line_product_id.id, 'price_unit': -(price * discount_factor), 'product_uom_qty': 1.0, 'product_uom': reward.discount_line_product_id.uom_id.id, 'reward_id': reward.id, 'coupon_id': coupon.id, 'points_cost': 0, 'reward_identifier_code': reward_code, 'sequence': sequence, 'tax_id': [(Command.CLEAR, 0, 0)] + [(Command.LINK, tax.id, False) for tax in mapped_taxes[tax]] } for tax, price in discountable_per_tax.items() if price} # We only assign the point cost to one line to avoid counting the cost multiple times if reward_dict: reward_dict[next(iter(reward_dict))]['points_cost'] = point_cost # Returning .values() directly does not return a subscribable list return list(reward_dict.values()) def _get_program_domain(self): """ Returns the base domain that all programs have to comply to. """ self.ensure_one() today = fields.Date.context_today(self) return [('active', '=', True), ('sale_ok', '=', True), ('company_id', 'in', (self.company_id.id, False)), '|', ('pricelist_ids', '=', False), ('pricelist_ids', 'in', [self.pricelist_id.id]), '|', ('date_from', '=', False), ('date_from', '<=', today), '|', ('date_to', '=', False), ('date_to', '>=', today)] def _get_trigger_domain(self): """ Returns the base domain that all triggers have to comply to. """ self.ensure_one() today = fields.Date.context_today(self) return [('active', '=', True), ('program_id.sale_ok', '=', True), ('company_id', 'in', (self.company_id.id, False)), '|', ('program_id.pricelist_ids', '=', False), ('program_id.pricelist_ids', 'in', [self.pricelist_id.id]), '|', ('program_id.date_from', '=', False), ('program_id.date_from', '<=', today), '|', ('program_id.date_to', '=', False), ('program_id.date_to', '>=', today)] def _get_applicable_program_points(self, domain=None): """ Returns a dict with the points per program for each (automatic) program that is applicable """ self.ensure_one() if not domain: domain = [('trigger', '=', 'auto')] # Make sure domain always complies with the order's domain rules domain = expression.AND([self._get_program_domain(), domain]) # No other way than to test all programs to the order programs = self.env['loyalty.program'].search(domain) all_status = self._program_check_compute_points(programs) program_points = {p: status['points'][0] for p, status in all_status.items() if 'points' in status} return program_points def _get_points_programs(self): """ Returns all programs that give points on the current order. """ self.ensure_one() return self.coupon_point_ids.coupon_id.program_id def _get_reward_programs(self): """ Returns all programs that are being used for rewards. """ self.ensure_one() return self.order_line.reward_id.program_id def _get_reward_coupons(self): """ Returns all coupons that are a reward. """ self.ensure_one() return self.coupon_point_ids.coupon_id.filtered(lambda c: c.program_id.applies_on == 'future') def _get_applied_programs(self): """ Returns all applied programs on current order. Applied programs is the combination of both new points for your order and the programs linked to rewards. """ self.ensure_one() return self._get_points_programs() | self._get_reward_programs() def _compute_invoice_status(self): # Handling of a specific situation: an order contains # a product invoiced on delivery and a promo line invoiced # on order. We would avoid having the invoice status 'to_invoice' # if the created invoice will only contain the promotion line super()._compute_invoice_status() for order in self: if order.invoice_status != 'to invoice': continue if not any(not line.is_reward_line and line.invoice_status == 'to invoice' for line in order.order_line): order.invoice_status = 'no' def _get_invoiceable_lines(self, final=False): """ Ensures we cannot invoice only reward lines. Since promotion lines are specified with service products, those lines are directly invoiceable when the order is confirmed which can result in invoices containing only promotion lines. To avoid those cases, we allow the invoicing of promotion lines if at least another 'basic' lines is also invoiceable. """ invoiceable_lines = super()._get_invoiceable_lines(final) for line in invoiceable_lines: if not line.is_reward_line: return invoiceable_lines return self.env['sale.order.line'] def _recompute_prices(self): """Recompute coupons/promotions after pricelist prices reset.""" super()._recompute_prices() if any(line.is_reward_line for line in self.order_line): self._update_programs_and_rewards() def _get_point_changes(self): """ Returns the changes in points per coupon as a dict. Used when validating/cancelling an order """ points_per_coupon = defaultdict(lambda: 0) for coupon_point in self.coupon_point_ids: points_per_coupon[coupon_point.coupon_id] += coupon_point.points for line in self.order_line: if not line.reward_id or not line.coupon_id: continue points_per_coupon[line.coupon_id] -= line.points_cost return points_per_coupon def _get_real_points_for_coupon(self, coupon, post_confirm=False): """ Returns the actual points usable for this coupon for this order. Set pos_confirm to True to include points for future orders. This is calculated by taking the points on the coupon, the points the order will give to the coupon (if applicable) and removing the points taken by already applied rewards. """ self.ensure_one() points = coupon.points if coupon.program_id.applies_on != 'future' and self.state not in ('sale', 'done'): # Points that will be given by the order upon confirming the order points += self.coupon_point_ids.filtered(lambda p: p.coupon_id == coupon).points # Points already used by rewards points -= sum(self.order_line.filtered(lambda l: l.coupon_id == coupon).mapped('points_cost')) points = coupon.currency_id.round(points) return points def _add_points_for_coupon(self, coupon_points): """ Updates (or creates) an entry in coupon_point_ids for the given coupons. """ self.ensure_one() if self.state == 'sale': for coupon, points in coupon_points.items(): coupon.sudo().points += points for pe in self.coupon_point_ids.sudo(): if pe.coupon_id in coupon_points: pe.points = coupon_points.pop(pe.coupon_id) if coupon_points: self.sudo().with_context(tracking_disable=True).write({ 'coupon_point_ids': [(0, 0, { 'coupon_id': coupon.id, 'points': points, }) for coupon, points in coupon_points.items()] }) def _remove_program_from_points(self, programs): self.coupon_point_ids.filtered(lambda p: p.coupon_id.program_id in programs).sudo().unlink() def _get_reward_line_values(self, reward, coupon, **kwargs): self.ensure_one() self = self.with_context(lang=self._get_lang()) reward = reward.with_context(lang=self._get_lang()) if reward.reward_type == 'discount': return self._get_reward_values_discount(reward, coupon, **kwargs) elif reward.reward_type == 'product': return self._get_reward_values_product(reward, coupon, **kwargs) def _write_vals_from_reward_vals(self, reward_vals, old_lines, delete=True): """ Update, create new reward line and delete old lines in one write on `order_line` Returns the untouched old lines. """ self.ensure_one() command_list = [] for vals, line in zip(reward_vals, old_lines): command_list.append((Command.UPDATE, line.id, vals)) if len(reward_vals) > len(old_lines): command_list.extend((Command.CREATE, 0, vals) for vals in reward_vals[len(old_lines):]) elif len(reward_vals) < len(old_lines) and delete: command_list.extend((Command.DELETE, line.id) for line in old_lines[len(reward_vals):]) self.write({'order_line': command_list}) return self.env['sale.order.line'] if delete else old_lines[len(reward_vals):] def _apply_program_reward(self, reward, coupon, **kwargs): """ Applies the reward to the order provided the given coupon has enough points. This method does not check for program rules. This method also assumes the points added by the program triggers have already been computed. The temporary points are used if the program is applicable to the current order. Returns a dict containing the error message or empty if everything went correctly. NOTE: A call to `_update_programs_and_rewards` is expected to reorder the discounts. """ self.ensure_one() # Use the old lines before creating new ones. These should already be in a 'reset' state. old_reward_lines = kwargs.get('old_lines', self.env['sale.order.line']) if reward.is_global_discount: global_discount_reward_lines = self._get_applied_global_discount_lines() global_discount_reward = global_discount_reward_lines.reward_id if global_discount_reward and global_discount_reward != reward and global_discount_reward.discount >= reward.discount: return {'error': _('A better global discount is already applied.')} elif global_discount_reward and global_discount_reward != reward: # Invalidate the old global discount as it may impact the new discount to apply global_discount_reward_lines._reset_loyalty(True) old_reward_lines |= global_discount_reward_lines if not reward.program_id.is_nominative and reward.program_id.applies_on == 'future' and coupon in self.coupon_point_ids.coupon_id: return {'error': _('The coupon can only be claimed on future orders.')} elif self._get_real_points_for_coupon(coupon) < reward.required_points: return {'error': _('The coupon does not have enough points for the selected reward.')} reward_vals = self._get_reward_line_values(reward, coupon, **kwargs) self._write_vals_from_reward_vals(reward_vals, old_reward_lines) return {} def _get_claimable_rewards(self, forced_coupons=None): """ Fetch all rewards that are currently claimable from all concerned coupons, meaning coupons from applied programs and applied rewards or the coupons given as parameter. Returns a dict containing the all the claimable rewards grouped by coupon. Coupons that can not claim any reward are not contained in the result. """ self.ensure_one() all_coupons = forced_coupons or (self.coupon_point_ids.coupon_id | self.order_line.coupon_id | self.applied_coupon_ids) has_payment_reward = any(line.reward_id.program_id.is_payment_program for line in self.order_line) total_is_zero = float_is_zero(self.amount_total, precision_digits=2) result = defaultdict(lambda: self.env['loyalty.reward']) global_discount_reward = self._get_applied_global_discount() for coupon in all_coupons: points = self._get_real_points_for_coupon(coupon) for reward in coupon.program_id.reward_ids: if reward.is_global_discount and global_discount_reward and global_discount_reward.discount >= reward.discount: continue # Discounts are not allowed if the total is zero unless there is a payment reward, in which case we allow discounts. # If the total is 0 again without the payment reward it will be removed. if reward.reward_type == 'discount' and total_is_zero and (not has_payment_reward or reward.program_id.is_payment_program): continue if points >= reward.required_points: result[coupon] |= reward return result def _allow_nominative_programs(self): """ Whether or not this order may use nominative programs. """ self.ensure_one() return True def _update_programs_and_rewards(self): """ Updates applied programs's given points with the current state of the order. Checks automatic programs for applicability. Updates applied rewards using the new points and the current state of the order (for example with % discounts). """ self.ensure_one() # +===================================================+ # | STEP 1: Retrieve all applicable programs | # +===================================================+ # Automatically load in eWallet coupons if self._allow_nominative_programs(): ewallet_coupons = self.env['loyalty.card'].search( [('id', 'not in', self.applied_coupon_ids.ids), ('partner_id', '=', self.partner_id.id), ('points', '>', 0), ('program_id.program_type', '=', 'ewallet')]) if ewallet_coupons: self.applied_coupon_ids += ewallet_coupons # Programs that are applied to the order and count points points_programs = self._get_points_programs() # Coupon programs that require the program's rules to match but do not count for points coupon_programs = self.applied_coupon_ids.program_id # Programs that are automatic and not yet applied program_domain = self._get_program_domain() domain = expression.AND([program_domain, [('id', 'not in', points_programs.ids), ('trigger', '=', 'auto'), ('rule_ids.mode', '=', 'auto')]]) automatic_programs = self.env['loyalty.program'].search(domain).filtered(lambda p: not p.limit_usage or p.total_order_count < p.max_usage) all_programs_to_check = points_programs | coupon_programs | automatic_programs all_coupons = self.coupon_point_ids.coupon_id | self.applied_coupon_ids # First basic check using the program_domain -> for example if a program gets archived mid quotation domain_matching_programs = all_programs_to_check.filtered_domain(program_domain) all_programs_status = {p: {'error': 'error'} for p in all_programs_to_check - domain_matching_programs} # Compute applicability and points given for all programs that passed the domain check # Note that points are computed with reward lines present all_programs_status.update(self._program_check_compute_points(domain_matching_programs)) # Delay any unlink to the end of the function since they cause a full cache invalidation lines_to_unlink = self.env['sale.order.line'] coupons_to_unlink = self.env['loyalty.card'] point_entries_to_unlink = self.env['sale.order.coupon.points'] # Remove any coupons that are expired self.applied_coupon_ids = self.applied_coupon_ids.filtered(lambda c: (not c.expiration_date or c.expiration_date >= fields.Date.today()) ) point_ids_per_program = defaultdict(lambda: self.env['sale.order.coupon.points']) for pe in self.coupon_point_ids: # Remove any point entry for a coupon that does not belong to the customer if pe.coupon_id.partner_id and pe.coupon_id.partner_id != self.partner_id: pe.points = 0 point_entries_to_unlink |= pe else: point_ids_per_program[pe.coupon_id.program_id] |= pe # +==========================================+ # | STEP 2: Update applied programs | # +==========================================+ # Programs that were not applied via a coupon for program in points_programs: status = all_programs_status[program] program_point_entries = point_ids_per_program[program] if 'error' in status: # Program is not applicable anymore coupons_from_order = program_point_entries.coupon_id.filtered(lambda c: c.order_id == self) all_coupons -= coupons_from_order # Invalidate those lines so that they don't impact anything further down the line program_reward_lines = self.order_line.filtered(lambda l: l.coupon_id in coupons_from_order) program_reward_lines._reset_loyalty(True) lines_to_unlink |= program_reward_lines # Delete coupon created by this order for this program if it is not nominative if not program.is_nominative: coupons_to_unlink |= coupons_from_order else: # Only remove the coupon_point_id point_entries_to_unlink |= program_point_entries point_entries_to_unlink.points = 0 # Remove the code activated rules self.code_enabled_rule_ids -= program.rule_ids else: # Program stays applicable, update our points all_point_changes = [p for p in status['points'] if p] if not all_point_changes and program.is_nominative: all_point_changes = [0] for pe, points in zip(program_point_entries.sudo(), all_point_changes): pe.points = points if len(program_point_entries) < len(all_point_changes): new_coupon_points = all_point_changes[len(program_point_entries):] # NOTE: Maybe we could batch the creation of coupons across multiple programs but this really only applies to gift cards new_coupons = self.env['loyalty.card'].with_context(loyalty_no_mail=True, tracking_disable=True).create([{ 'program_id': program.id, 'partner_id': False, 'points': 0, 'order_id': self.id, } for _ in new_coupon_points]) self._add_points_for_coupon({coupon: x for coupon, x in zip(new_coupons, new_coupon_points)}) elif len(program_point_entries) > len(all_point_changes): point_ids_to_unlink = program_point_entries[len(all_point_changes):] all_coupons -= point_ids_to_unlink.coupon_id coupons_to_unlink |= point_ids_to_unlink.coupon_id point_ids_to_unlink.points = 0 # Programs applied using a coupon applied_coupon_per_program = defaultdict(lambda: self.env['loyalty.card']) for coupon in self.applied_coupon_ids: applied_coupon_per_program[coupon.program_id] |= coupon for program in coupon_programs: if program not in domain_matching_programs or\ (program.applies_on == 'current' and 'error' in all_programs_status[program]): program_reward_lines = self.order_line.filtered(lambda l: l.coupon_id in applied_coupon_per_program[program]) program_reward_lines._reset_loyalty(True) lines_to_unlink |= program_reward_lines self.applied_coupon_ids -= applied_coupon_per_program[program] all_coupons -= applied_coupon_per_program[program] # +==========================================+ # | STEP 3: Update reward lines | # +==========================================+ # We will reuse these lines as much as possible, this resets the order in a reward-less state reward_line_pool = self.order_line.filtered(lambda l: l.reward_id and l.coupon_id)._reset_loyalty() seen_rewards = set() line_rewards = [] payment_rewards = [] # gift_card and ewallet are considered as payments and should always be applied last for line in self.order_line: if line.reward_identifier_code in seen_rewards or not line.reward_id or\ not line.coupon_id: continue seen_rewards.add(line.reward_identifier_code) if line.reward_id.program_id.is_payment_program: payment_rewards.append((line.reward_id, line.coupon_id, line.reward_identifier_code, line.product_id)) else: line_rewards.append((line.reward_id, line.coupon_id, line.reward_identifier_code, line.product_id)) for reward_key in itertools.chain(line_rewards, payment_rewards): coupon = reward_key[1] reward = reward_key[0] program = reward.program_id points = self._get_real_points_for_coupon(coupon) if coupon not in all_coupons or points < reward.required_points or program not in domain_matching_programs: # Reward is not applicable anymore, the reward lines will simply be removed at the end of this function continue try: values_list = self._get_reward_line_values(reward, coupon, product=reward_key[3]) except UserError: # It could happen that we have nothing to discount after changing the order. values_list = [] reward_line_pool = self._write_vals_from_reward_vals(values_list, reward_line_pool, delete=False) lines_to_unlink |= reward_line_pool # +==========================================+ # | STEP 4: Apply new programs | # +==========================================+ for program in automatic_programs: program_status = all_programs_status[program] if 'error' in program_status: continue self.__try_apply_program(program, False, program_status) # +==========================================+ # | STEP 5: Cleanup | # +==========================================+ order_line_update = [(Command.DELETE, line.id) for line in lines_to_unlink] if order_line_update: self.write({'order_line': order_line_update}) if coupons_to_unlink: coupons_to_unlink.sudo().unlink() if point_entries_to_unlink: point_entries_to_unlink.sudo().unlink() def _get_not_rewarded_order_lines(self): return self.order_line.filtered(lambda line: line.product_id and not line.reward_id) def _program_check_compute_points(self, programs): """ Checks the program validity from the order lines aswell as computing the number of points to add. Returns a dict containing the error message or the points that will be given with the keys 'points'. """ self.ensure_one() # Prepare quantities order_lines = self._get_not_rewarded_order_lines() products = order_lines.product_id products_qties = dict.fromkeys(products, 0) for line in order_lines: products_qties[line.product_id] += line.product_uom_qty # Contains the products that can be applied per rule products_per_rule = programs._get_valid_products(products) # Prepare amounts no_effect_lines = self._get_no_effect_on_threshold_lines() base_untaxed_amount = self.amount_untaxed - sum(line.price_subtotal for line in no_effect_lines) base_tax_amount = self.amount_tax - sum(line.price_tax for line in no_effect_lines) amounts_per_program = {p: {'untaxed': base_untaxed_amount, 'tax': base_tax_amount} for p in programs} for line in self.order_line: if not line.reward_id or line.reward_id.reward_type != 'discount': continue for program in programs: # Do not consider the program's discount + automatic discount lines for the amount to check. if line.reward_id.program_id.trigger == 'auto' or line.reward_id.program_id == program: amounts_per_program[program]['untaxed'] -= line.price_subtotal amounts_per_program[program]['tax'] -= line.price_tax result = {} for program in programs: untaxed_amount = amounts_per_program[program]['untaxed'] tax_amount = amounts_per_program[program]['tax'] # Used for error messages # By default False, but True if no rules and applies_on current -> misconfigured coupons program code_matched = not bool(program.rule_ids) and program.applies_on == 'current' # Stays false if all triggers have code and none have been activated minimum_amount_matched = code_matched product_qty_matched = code_matched points = 0 # Some rules may split their points per unit / money spent # (i.e. gift cards 2x50$ must result in two 50$ codes) rule_points = [] program_result = result.setdefault(program, dict()) for rule in program.rule_ids: if rule.mode == 'with_code' and rule not in self.code_enabled_rule_ids: continue code_matched = True rule_amount = rule._compute_amount(self.currency_id) if rule_amount > (rule.minimum_amount_tax_mode == 'incl' and (untaxed_amount + tax_amount) or untaxed_amount): continue minimum_amount_matched = True if not products_per_rule.get(rule): continue rule_products = products_per_rule[rule] ordered_rule_products_qty = sum(products_qties[product] for product in rule_products) if ordered_rule_products_qty < rule.minimum_qty or not rule_products: continue product_qty_matched = True if not rule.reward_point_amount: continue # Count all points separately if the order is for the future and the split option is enabled if program.applies_on == 'future' and rule.reward_point_split and rule.reward_point_mode != 'order': if rule.reward_point_mode == 'unit': rule_points.extend(rule.reward_point_amount for _ in range(int(ordered_rule_products_qty))) elif rule.reward_point_mode == 'money': for line in self.order_line: if line.is_reward_line or line.product_id not in rule_products or line.product_uom_qty <= 0: continue points_per_unit = float_round( (rule.reward_point_amount * line.price_total / line.product_uom_qty), precision_digits=2, rounding_method='DOWN') if not points_per_unit: continue rule_points.extend([points_per_unit] * int(line.product_uom_qty)) else: # All checks have been passed we can now compute the points to give if rule.reward_point_mode == 'order': points += rule.reward_point_amount elif rule.reward_point_mode == 'money': # Compute amount paid for rule # NOTE: this does not account for discounts -> 1 point per $ * (100$ - 30%) will result in 100 points amount_paid = sum(max(0, line.price_total) for line in order_lines if line.product_id in rule_products) points += float_round(rule.reward_point_amount * amount_paid, precision_digits=2, rounding_method='DOWN') elif rule.reward_point_mode == 'unit': points += rule.reward_point_amount * ordered_rule_products_qty # NOTE: for programs that are nominative we always allow the program to be 'applied' on the order # with 0 points so that `_get_claimable_rewards` returns the rewards associated with those programs if not program.is_nominative: if not code_matched: program_result['error'] = _("This program requires a code to be applied.") elif not minimum_amount_matched: program_result['error'] = _( 'A minimum of %(amount)s %(currency)s should be purchased to get the reward', amount=min(program.rule_ids.mapped('minimum_amount')), currency=program.currency_id.name, ) elif not product_qty_matched: program_result['error'] = _("You don't have the required product quantities on your sales order.") elif not self._allow_nominative_programs(): program_result['error'] = _("This program is not available for public users.") if 'error' not in program_result: points_result = [points] + rule_points program_result['points'] = points_result return result def __try_apply_program(self, program, coupon, status): self.ensure_one() all_points = status['points'] points = all_points[0] coupons = coupon or self.env['loyalty.card'] if coupon: if program.is_nominative: self._add_points_for_coupon({coupon: points}) elif not coupon: # If the program only applies on the current order it does not make sense to fetch already existing coupons if program.is_nominative: coupon = self.env['loyalty.card'].search( [('partner_id', '=', self.partner_id.id), ('program_id', '=', program.id)], limit=1) # Do not apply 'nominative' programs if no point is given and no coupon exists if not points and not coupon: return {'error': _('No card found for this loyalty program and no points will be given with this order.')} elif coupon: self._add_points_for_coupon({coupon: points}) coupons = coupon if not coupon: all_points = [p for p in all_points if p] partner = False # Loyalty programs and ewallets are nominative if program.is_nominative: partner = self.partner_id.id coupons = self.env['loyalty.card'].sudo().with_context(loyalty_no_mail=True, tracking_disable=True).create([{ 'program_id': program.id, 'partner_id': partner, 'points': 0, 'order_id': self.id, } for _ in all_points]) self._add_points_for_coupon({coupon: x for coupon, x in zip(coupons, all_points)}) return {'coupon': coupons} def _try_apply_program(self, program, coupon=None): """ Tries to apply a program using the coupon if provided. This function provides the full routine to apply a program, it will check for applicability aswell as creating the necessary coupons and co-models to give the points to the customer. This function does not apply any reward to the order, rewards have to be given manually. Returns a dict containing the error message or containing the associated coupon(s). """ self.ensure_one() # Basic checks if not program.filtered_domain(self._get_program_domain()): return {'error': _('The program is not available for this order.')} elif program in self._get_applied_programs(): return {'error': _('This program is already applied to this order.')} # Check for applicability from the program's triggers/rules. # This step should also compute the amount of points to give for that program on that order. status = self._program_check_compute_points(program)[program] if 'error' in status: return status return self.__try_apply_program(program, coupon, status) def _try_apply_code(self, code): """ Tries to apply a promotional code to the sales order. It can be either from a coupon or a program rule. Returns a dict with the following possible keys: - 'not_found': Populated with True if the code did not yield any result. - 'error': Any error message that could occur. OR The result of `_get_claimable_rewards` with the found or newly created coupon, it will be empty if the coupon was consumed completely. """ self.ensure_one() base_domain = self._get_trigger_domain() domain = expression.AND([base_domain, [('mode', '=', 'with_code'), ('code', '=', code)]]) rule = self.env['loyalty.rule'].search(domain) program = rule.program_id coupon = False if rule in self.code_enabled_rule_ids: return {'error': _('This promo code is already applied.')} # No trigger was found from the code, try to find a coupon if not program: coupon = self.env['loyalty.card'].search([('code', '=', code)]) if not coupon or\ not coupon.program_id.active or\ not coupon.program_id.reward_ids or\ not coupon.program_id.filtered_domain(self._get_program_domain()): return {'error': _('This code is invalid (%s).', code), 'not_found': True} elif coupon.expiration_date and coupon.expiration_date < fields.Date.today(): return {'error': _('This coupon is expired.')} elif coupon.points < min(coupon.program_id.reward_ids.mapped('required_points')): return {'error': _('This coupon has already been used.')} program = coupon.program_id if not program or not program.active: return {'error': _('This code is invalid (%s).', code), 'not_found': True} elif (program.limit_usage and program.total_order_count >= program.max_usage): return {'error': _('This code is expired (%s).', code)} # Rule will count the next time the points are updated if rule: self.code_enabled_rule_ids |= rule program_is_applied = program in self._get_points_programs() # Condition that need to apply program (if not applied yet): # current -> always # future -> if no coupon # nominative -> non blocking if card exists with points if coupon: self.applied_coupon_ids += coupon if program_is_applied: # Update the points for our programs, this will take the new trigger in account self._update_programs_and_rewards() elif program.applies_on != 'future' or not coupon: apply_result = self._try_apply_program(program, coupon) if 'error' in apply_result and (not program.is_nominative or (program.is_nominative and not coupon)): if rule: self.code_enabled_rule_ids -= rule if coupon: self.applied_coupon_ids -= coupon return apply_result coupon = apply_result.get('coupon', self.env['loyalty.card']) return self._get_claimable_rewards(forced_coupons=coupon)