# -*- coding: utf-8 -*- # Part of Talisman . See LICENSE file for full copyright and licensing details. import random import werkzeug.urls from collections import defaultdict from datetime import datetime, timedelta from odoo import api, exceptions, fields, models, _ from odoo.tools import sql class SignupError(Exception): pass def random_token(): # the token has an entropy of about 120 bits (6 bits/char * 20 chars) chars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789' return ''.join(random.SystemRandom().choice(chars) for _ in range(20)) def now(**kwargs): return datetime.now() + timedelta(**kwargs) class ResPartner(models.Model): _inherit = 'res.partner' signup_token = fields.Char(copy=False, groups="base.group_erp_manager", compute='_compute_token', inverse='_inverse_token') signup_type = fields.Char(string='Signup Token Type', copy=False, groups="base.group_erp_manager") signup_expiration = fields.Datetime(copy=False, groups="base.group_erp_manager") signup_valid = fields.Boolean(compute='_compute_signup_valid', string='Signup Token is Valid') signup_url = fields.Char(compute='_compute_signup_url', string='Signup URL') def init(self): super().init() if not sql.column_exists(self.env.cr, self._table, "signup_token"): self.env.cr.execute("ALTER TABLE res_partner ADD COLUMN signup_token varchar") @api.depends('signup_token', 'signup_expiration') def _compute_signup_valid(self): dt = now() for partner, partner_sudo in zip(self, self.sudo()): partner.signup_valid = bool(partner_sudo.signup_token) and \ (not partner_sudo.signup_expiration or dt <= partner_sudo.signup_expiration) def _compute_signup_url(self): """ proxy for function field towards actual implementation """ result = self.sudo()._get_signup_url_for_action() for partner in self: if any(u._is_internal() for u in partner.user_ids if u != self.env.user): self.env['res.users'].check_access_rights('write') if any(u.has_group('base.group_portal') for u in partner.user_ids if u != self.env.user): self.env['res.partner'].check_access_rights('write') partner.signup_url = result.get(partner.id, False) def _compute_token(self): for partner in self: self.env.cr.execute('SELECT signup_token FROM res_partner WHERE id=%s', (partner._origin.id,)) partner.signup_token = self.env.cr.fetchone()[0] def _inverse_token(self): for partner in self: self.env.cr.execute('UPDATE res_partner SET signup_token = %s WHERE id=%s', (partner.signup_token or None, partner.id)) def _get_signup_url_for_action(self, url=None, action=None, view_type=None, menu_id=None, res_id=None, model=None): """ generate a signup url for the given partner ids and action, possibly overriding the url state components (menu_id, id, view_type) """ res = dict.fromkeys(self.ids, False) for partner in self: base_url = partner.get_base_url() # when required, make sure the partner has a valid signup token if self.env.context.get('signup_valid') and not partner.user_ids: partner.sudo().signup_prepare() route = 'login' # the parameters to encode for the query query = {'db': self.env.cr.dbname} if self.env.context.get('create_user'): query['signup_email'] = partner.email signup_type = self.env.context.get('signup_force_type_in_url', partner.sudo().signup_type or '') if signup_type: route = 'reset_password' if signup_type == 'reset' else signup_type if partner.sudo().signup_token and signup_type: query['token'] = partner.sudo().signup_token elif partner.user_ids: query['login'] = partner.user_ids[0].login else: continue # no signup token, no user, thus no signup url! if url: query['redirect'] = url else: fragment = dict() base = '/web#' if action == '/mail/view': base = '/mail/view?' elif action: fragment['action'] = action if view_type: fragment['view_type'] = view_type if menu_id: fragment['menu_id'] = menu_id if model: fragment['model'] = model if res_id: fragment['res_id'] = res_id if fragment: query['redirect'] = base + werkzeug.urls.url_encode(fragment) signup_url = "/web/%s?%s" % (route, werkzeug.urls.url_encode(query)) if not self.env.context.get('relative_url'): signup_url = werkzeug.urls.url_join(base_url, signup_url) res[partner.id] = signup_url return res def action_signup_prepare(self): return self.signup_prepare() def signup_get_auth_param(self): """ Get a signup token related to the partner if signup is enabled. If the partner already has a user, get the login parameter. """ if not self.env.user._is_internal() and not self.env.is_admin(): raise exceptions.AccessDenied() res = defaultdict(dict) allow_signup = self.env['res.users']._get_signup_invitation_scope() == 'b2c' for partner in self: partner = partner.sudo() if allow_signup and not partner.user_ids: partner.signup_prepare() res[partner.id]['auth_signup_token'] = partner.signup_token elif partner.user_ids: res[partner.id]['auth_login'] = partner.user_ids[0].login return res def signup_cancel(self): return self.write({'signup_token': False, 'signup_type': False, 'signup_expiration': False}) def signup_prepare(self, signup_type="signup", expiration=False): """ generate a new token for the partners with the given validity, if necessary :param expiration: the expiration datetime of the token (string, optional) """ for partner in self: if expiration or not partner.signup_valid: token = random_token() while self._signup_retrieve_partner(token): token = random_token() partner.write({'signup_token': token, 'signup_type': signup_type, 'signup_expiration': expiration}) return True @api.model def _signup_retrieve_partner(self, token, check_validity=False, raise_exception=False): """ find the partner corresponding to a token, and possibly check its validity :param token: the token to resolve :param check_validity: if True, also check validity :param raise_exception: if True, raise exception instead of returning False :return: partner (browse record) or False (if raise_exception is False) """ self.env.cr.execute("SELECT id FROM res_partner WHERE signup_token = %s AND active", (token,)) partner_id = self.env.cr.fetchone() partner = self.browse(partner_id[0]) if partner_id else None if not partner: if raise_exception: raise exceptions.UserError(_("Signup token '%s' is not valid", token)) return False if check_validity and not partner.signup_valid: if raise_exception: raise exceptions.UserError(_("Signup token '%s' is no longer valid", token)) return False return partner @api.model def signup_retrieve_info(self, token): """ retrieve the user info about the token :return: a dictionary with the user information: - 'db': the name of the database - 'token': the token, if token is valid - 'name': the name of the partner, if token is valid - 'login': the user login, if the user already exists - 'email': the partner email, if the user does not exist """ partner = self._signup_retrieve_partner(token, raise_exception=True) res = {'db': self.env.cr.dbname} if partner.signup_valid: res['token'] = token res['name'] = partner.name if partner.user_ids: res['login'] = partner.user_ids[0].login else: res['email'] = res['login'] = partner.email or '' return res