# -*- coding: utf-8 -*- # Part of Odoo. See LICENSE file for full copyright and licensing details. import random import werkzeug.urls from collections import defaultdict from datetime import datetime, timedelta from odoo import api, exceptions, fields, models, _ from odoo.tools import sql class SignupError(Exception): pass def random_token(): # the token has an entropy of about 120 bits (6 bits/char * 20 chars) chars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789' return ''.join(random.SystemRandom().choice(chars) for _ in range(20)) def now(**kwargs): return datetime.now() + timedelta(**kwargs) class ResPartner(models.Model): _inherit = 'res.partner' signup_token = fields.Char(copy=False, groups="base.group_erp_manager", compute='_compute_token', inverse='_inverse_token') signup_type = fields.Char(string='Signup Token Type', copy=False, groups="base.group_erp_manager") signup_expiration = fields.Datetime(copy=False, groups="base.group_erp_manager") signup_valid = fields.Boolean(compute='_compute_signup_valid', string='Signup Token is Valid') signup_url = fields.Char(compute='_compute_signup_url', string='Signup URL') def init(self): super().init() if not sql.column_exists(self.env.cr, self._table, "signup_token"): self.env.cr.execute("ALTER TABLE res_partner ADD COLUMN signup_token varchar") @api.depends('signup_token', 'signup_expiration') def _compute_signup_valid(self): dt = now() for partner, partner_sudo in zip(self, self.sudo()): partner.signup_valid = bool(partner_sudo.signup_token) and \ (not partner_sudo.signup_expiration or dt <= partner_sudo.signup_expiration) def _compute_signup_url(self): """ proxy for function field towards actual implementation """ result = self.sudo()._get_signup_url_for_action() for partner in self: if any(u._is_internal() for u in partner.user_ids if u != self.env.user): self.env['res.users'].check_access_rights('write') if any(u.has_group('base.group_portal') for u in partner.user_ids if u != self.env.user): self.env['res.partner'].check_access_rights('write') partner.signup_url = result.get(partner.id, False) def _compute_token(self): for partner in self: self.env.cr.execute('SELECT signup_token FROM res_partner WHERE id=%s', (partner._origin.id,)) partner.signup_token = self.env.cr.fetchone()[0] def _inverse_token(self): for partner in self: self.env.cr.execute('UPDATE res_partner SET signup_token = %s WHERE id=%s', (partner.signup_token or None, partner.id)) def _get_signup_url_for_action(self, url=None, action=None, view_type=None, menu_id=None, res_id=None, model=None): """ generate a signup url for the given partner ids and action, possibly overriding the url state components (menu_id, id, view_type) """ res = dict.fromkeys(self.ids, False) for partner in self: base_url = partner.get_base_url() # when required, make sure the partner has a valid signup token if self.env.context.get('signup_valid') and not partner.user_ids: partner.sudo().signup_prepare() route = 'login' # the parameters to encode for the query query = {'db': self.env.cr.dbname} if self.env.context.get('create_user'): query['signup_email'] = partner.email signup_type = self.env.context.get('signup_force_type_in_url', partner.sudo().signup_type or '') if signup_type: route = 'reset_password' if signup_type == 'reset' else signup_type if partner.sudo().signup_token and signup_type: query['token'] = partner.sudo().signup_token elif partner.user_ids: query['login'] = partner.user_ids[0].login else: continue # no signup token, no user, thus no signup url! if url: query['redirect'] = url else: fragment = dict() base = '/web#' if action == '/mail/view': base = '/mail/view?' elif action: fragment['action'] = action if view_type: fragment['view_type'] = view_type if menu_id: fragment['menu_id'] = menu_id if model: fragment['model'] = model if res_id: fragment['res_id'] = res_id if fragment: query['redirect'] = base + werkzeug.urls.url_encode(fragment) signup_url = "/web/%s?%s" % (route, werkzeug.urls.url_encode(query)) if not self.env.context.get('relative_url'): signup_url = werkzeug.urls.url_join(base_url, signup_url) res[partner.id] = signup_url return res def action_signup_prepare(self): return self.signup_prepare() def signup_get_auth_param(self): """ Get a signup token related to the partner if signup is enabled. If the partner already has a user, get the login parameter. """ if not self.env.user._is_internal() and not self.env.is_admin(): raise exceptions.AccessDenied() res = defaultdict(dict) allow_signup = self.env['res.users']._get_signup_invitation_scope() == 'b2c' for partner in self: partner = partner.sudo() if allow_signup and not partner.user_ids: partner.signup_prepare() res[partner.id]['auth_signup_token'] = partner.signup_token elif partner.user_ids: res[partner.id]['auth_login'] = partner.user_ids[0].login return res def signup_cancel(self): return self.write({'signup_token': False, 'signup_type': False, 'signup_expiration': False}) def signup_prepare(self, signup_type="signup", expiration=False): """ generate a new token for the partners with the given validity, if necessary :param expiration: the expiration datetime of the token (string, optional) """ for partner in self: if expiration or not partner.signup_valid: token = random_token() while self._signup_retrieve_partner(token): token = random_token() partner.write({'signup_token': token, 'signup_type': signup_type, 'signup_expiration': expiration}) return True @api.model def _signup_retrieve_partner(self, token, check_validity=False, raise_exception=False): """ find the partner corresponding to a token, and possibly check its validity :param token: the token to resolve :param check_validity: if True, also check validity :param raise_exception: if True, raise exception instead of returning False :return: partner (browse record) or False (if raise_exception is False) """ self.env.cr.execute("SELECT id FROM res_partner WHERE signup_token = %s AND active", (token,)) partner_id = self.env.cr.fetchone() partner = self.browse(partner_id[0]) if partner_id else None if not partner: if raise_exception: raise exceptions.UserError(_("Signup token '%s' is not valid", token)) return False if check_validity and not partner.signup_valid: if raise_exception: raise exceptions.UserError(_("Signup token '%s' is no longer valid", token)) return False return partner @api.model def signup_retrieve_info(self, token): """ retrieve the user info about the token :return: a dictionary with the user information: - 'db': the name of the database - 'token': the token, if token is valid - 'name': the name of the partner, if token is valid - 'login': the user login, if the user already exists - 'email': the partner email, if the user does not exist """ partner = self._signup_retrieve_partner(token, raise_exception=True) res = {'db': self.env.cr.dbname} if partner.signup_valid: res['token'] = token res['name'] = partner.name if partner.user_ids: res['login'] = partner.user_ids[0].login else: res['email'] = res['login'] = partner.email or '' return res