auth_signup/models/res_partner.py

198 lines
8.6 KiB
Python
Raw Permalink Normal View History

2024-05-03 12:44:03 +03:00
# -*- coding: utf-8 -*-
2024-05-21 15:09:15 +03:00
# Part of Talisman . See LICENSE file for full copyright and licensing details.
2024-05-03 12:44:03 +03:00
import random
import werkzeug.urls
from collections import defaultdict
from datetime import datetime, timedelta
from odoo import api, exceptions, fields, models, _
from odoo.tools import sql
class SignupError(Exception):
pass
def random_token():
# the token has an entropy of about 120 bits (6 bits/char * 20 chars)
chars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
return ''.join(random.SystemRandom().choice(chars) for _ in range(20))
def now(**kwargs):
return datetime.now() + timedelta(**kwargs)
class ResPartner(models.Model):
_inherit = 'res.partner'
signup_token = fields.Char(copy=False, groups="base.group_erp_manager", compute='_compute_token', inverse='_inverse_token')
signup_type = fields.Char(string='Signup Token Type', copy=False, groups="base.group_erp_manager")
signup_expiration = fields.Datetime(copy=False, groups="base.group_erp_manager")
signup_valid = fields.Boolean(compute='_compute_signup_valid', string='Signup Token is Valid')
signup_url = fields.Char(compute='_compute_signup_url', string='Signup URL')
def init(self):
super().init()
if not sql.column_exists(self.env.cr, self._table, "signup_token"):
self.env.cr.execute("ALTER TABLE res_partner ADD COLUMN signup_token varchar")
@api.depends('signup_token', 'signup_expiration')
def _compute_signup_valid(self):
dt = now()
for partner, partner_sudo in zip(self, self.sudo()):
partner.signup_valid = bool(partner_sudo.signup_token) and \
(not partner_sudo.signup_expiration or dt <= partner_sudo.signup_expiration)
def _compute_signup_url(self):
""" proxy for function field towards actual implementation """
result = self.sudo()._get_signup_url_for_action()
for partner in self:
if any(u._is_internal() for u in partner.user_ids if u != self.env.user):
self.env['res.users'].check_access_rights('write')
if any(u.has_group('base.group_portal') for u in partner.user_ids if u != self.env.user):
self.env['res.partner'].check_access_rights('write')
partner.signup_url = result.get(partner.id, False)
def _compute_token(self):
for partner in self:
self.env.cr.execute('SELECT signup_token FROM res_partner WHERE id=%s', (partner._origin.id,))
partner.signup_token = self.env.cr.fetchone()[0]
def _inverse_token(self):
for partner in self:
self.env.cr.execute('UPDATE res_partner SET signup_token = %s WHERE id=%s', (partner.signup_token or None, partner.id))
def _get_signup_url_for_action(self, url=None, action=None, view_type=None, menu_id=None, res_id=None, model=None):
""" generate a signup url for the given partner ids and action, possibly overriding
the url state components (menu_id, id, view_type) """
res = dict.fromkeys(self.ids, False)
for partner in self:
base_url = partner.get_base_url()
# when required, make sure the partner has a valid signup token
if self.env.context.get('signup_valid') and not partner.user_ids:
partner.sudo().signup_prepare()
route = 'login'
# the parameters to encode for the query
query = {'db': self.env.cr.dbname}
if self.env.context.get('create_user'):
query['signup_email'] = partner.email
signup_type = self.env.context.get('signup_force_type_in_url', partner.sudo().signup_type or '')
if signup_type:
route = 'reset_password' if signup_type == 'reset' else signup_type
if partner.sudo().signup_token and signup_type:
query['token'] = partner.sudo().signup_token
elif partner.user_ids:
query['login'] = partner.user_ids[0].login
else:
continue # no signup token, no user, thus no signup url!
if url:
query['redirect'] = url
else:
fragment = dict()
base = '/web#'
if action == '/mail/view':
base = '/mail/view?'
elif action:
fragment['action'] = action
if view_type:
fragment['view_type'] = view_type
if menu_id:
fragment['menu_id'] = menu_id
if model:
fragment['model'] = model
if res_id:
fragment['res_id'] = res_id
if fragment:
query['redirect'] = base + werkzeug.urls.url_encode(fragment)
signup_url = "/web/%s?%s" % (route, werkzeug.urls.url_encode(query))
if not self.env.context.get('relative_url'):
signup_url = werkzeug.urls.url_join(base_url, signup_url)
res[partner.id] = signup_url
return res
def action_signup_prepare(self):
return self.signup_prepare()
def signup_get_auth_param(self):
""" Get a signup token related to the partner if signup is enabled.
If the partner already has a user, get the login parameter.
"""
if not self.env.user._is_internal() and not self.env.is_admin():
raise exceptions.AccessDenied()
res = defaultdict(dict)
allow_signup = self.env['res.users']._get_signup_invitation_scope() == 'b2c'
for partner in self:
partner = partner.sudo()
if allow_signup and not partner.user_ids:
partner.signup_prepare()
res[partner.id]['auth_signup_token'] = partner.signup_token
elif partner.user_ids:
res[partner.id]['auth_login'] = partner.user_ids[0].login
return res
def signup_cancel(self):
return self.write({'signup_token': False, 'signup_type': False, 'signup_expiration': False})
def signup_prepare(self, signup_type="signup", expiration=False):
""" generate a new token for the partners with the given validity, if necessary
:param expiration: the expiration datetime of the token (string, optional)
"""
for partner in self:
if expiration or not partner.signup_valid:
token = random_token()
while self._signup_retrieve_partner(token):
token = random_token()
partner.write({'signup_token': token, 'signup_type': signup_type, 'signup_expiration': expiration})
return True
@api.model
def _signup_retrieve_partner(self, token, check_validity=False, raise_exception=False):
""" find the partner corresponding to a token, and possibly check its validity
:param token: the token to resolve
:param check_validity: if True, also check validity
:param raise_exception: if True, raise exception instead of returning False
:return: partner (browse record) or False (if raise_exception is False)
"""
self.env.cr.execute("SELECT id FROM res_partner WHERE signup_token = %s AND active", (token,))
partner_id = self.env.cr.fetchone()
partner = self.browse(partner_id[0]) if partner_id else None
if not partner:
if raise_exception:
raise exceptions.UserError(_("Signup token '%s' is not valid", token))
return False
if check_validity and not partner.signup_valid:
if raise_exception:
raise exceptions.UserError(_("Signup token '%s' is no longer valid", token))
return False
return partner
@api.model
def signup_retrieve_info(self, token):
""" retrieve the user info about the token
:return: a dictionary with the user information:
- 'db': the name of the database
- 'token': the token, if token is valid
- 'name': the name of the partner, if token is valid
- 'login': the user login, if the user already exists
- 'email': the partner email, if the user does not exist
"""
partner = self._signup_retrieve_partner(token, raise_exception=True)
res = {'db': self.env.cr.dbname}
if partner.signup_valid:
res['token'] = token
res['name'] = partner.name
if partner.user_ids:
res['login'] = partner.user_ids[0].login
else:
res['email'] = res['login'] = partner.email or ''
return res