151 lines
5.4 KiB
Python
151 lines
5.4 KiB
Python
|
# -*- coding: utf-8 -*-
|
||
|
# Part of Odoo. See LICENSE file for full copyright and licensing details.
|
||
|
import babel.dates
|
||
|
import logging
|
||
|
|
||
|
from datetime import datetime, timedelta
|
||
|
|
||
|
from odoo import _, models
|
||
|
from odoo.exceptions import AccessDenied, UserError
|
||
|
from odoo.http import request
|
||
|
from odoo.tools.misc import babel_locale_parse, hmac
|
||
|
|
||
|
from odoo.addons.auth_totp.models.totp import hotp, TOTP
|
||
|
|
||
|
_logger = logging.getLogger(__name__)
|
||
|
|
||
|
TOTP_RATE_LIMITS = {
|
||
|
'send_email': (10, 3600),
|
||
|
'code_check': (10, 3600),
|
||
|
}
|
||
|
|
||
|
|
||
|
class Users(models.Model):
|
||
|
_inherit = 'res.users'
|
||
|
|
||
|
def _mfa_type(self):
|
||
|
r = super()._mfa_type()
|
||
|
if r is not None:
|
||
|
return r
|
||
|
ICP = self.env['ir.config_parameter'].sudo()
|
||
|
otp_required = False
|
||
|
if ICP.get_param('auth_totp.policy') == 'all_required':
|
||
|
otp_required = True
|
||
|
elif ICP.get_param('auth_totp.policy') == 'employee_required' and self._is_internal():
|
||
|
otp_required = True
|
||
|
if otp_required:
|
||
|
return 'totp_mail'
|
||
|
|
||
|
def _mfa_url(self):
|
||
|
r = super()._mfa_url()
|
||
|
if r is not None:
|
||
|
return r
|
||
|
if self._mfa_type() == 'totp_mail':
|
||
|
return '/web/login/totp'
|
||
|
|
||
|
def _totp_check(self, code):
|
||
|
self._totp_rate_limit('code_check')
|
||
|
user = self.sudo()
|
||
|
if user._mfa_type() != 'totp_mail':
|
||
|
return super()._totp_check(code)
|
||
|
|
||
|
key = user._get_totp_mail_key()
|
||
|
match = TOTP(key).match(code, window=3600, timestep=3600)
|
||
|
if match is None:
|
||
|
_logger.info("2FA check (mail): FAIL for %s %r", user, user.login)
|
||
|
raise AccessDenied(_("Verification failed, please double-check the 6-digit code"))
|
||
|
_logger.info("2FA check(mail): SUCCESS for %s %r", user, user.login)
|
||
|
self._totp_rate_limit_purge('code_check')
|
||
|
self._totp_rate_limit_purge('send_email')
|
||
|
return True
|
||
|
|
||
|
def _get_totp_mail_key(self):
|
||
|
self.ensure_one()
|
||
|
return hmac(self.env(su=True), 'auth_totp_mail-code', (self.id, self.login, self.login_date)).encode()
|
||
|
|
||
|
def _get_totp_mail_code(self):
|
||
|
self.ensure_one()
|
||
|
|
||
|
key = self._get_totp_mail_key()
|
||
|
|
||
|
now = datetime.now()
|
||
|
counter = int(datetime.timestamp(now) / 3600)
|
||
|
|
||
|
code = hotp(key, counter)
|
||
|
expiration = timedelta(seconds=3600)
|
||
|
lang = babel_locale_parse(self.env.context.get('lang') or self.lang)
|
||
|
expiration = babel.dates.format_timedelta(expiration, locale=lang)
|
||
|
|
||
|
return str(code).zfill(6), expiration
|
||
|
|
||
|
def _send_totp_mail_code(self):
|
||
|
self.ensure_one()
|
||
|
self._totp_rate_limit('send_email')
|
||
|
|
||
|
if not self.email:
|
||
|
raise UserError(_("Cannot send email: user %s has no email address.", self.name))
|
||
|
|
||
|
template = self.env.ref('auth_totp_mail_enforce.mail_template_totp_mail_code').sudo()
|
||
|
context = {}
|
||
|
if request:
|
||
|
device = request.httprequest.user_agent.platform
|
||
|
browser = request.httprequest.user_agent.browser
|
||
|
context.update({
|
||
|
'location': None,
|
||
|
'device': device and device.capitalize() or None,
|
||
|
'browser': browser and browser.capitalize() or None,
|
||
|
'ip': request.httprequest.environ['REMOTE_ADDR'],
|
||
|
})
|
||
|
if request.geoip.city.name:
|
||
|
context['location'] = f"{request.geoip.city.name}, {request.geoip.country_name}"
|
||
|
|
||
|
email_values = {
|
||
|
'email_to': self.email,
|
||
|
'email_cc': False,
|
||
|
'auto_delete': True,
|
||
|
'recipient_ids': [],
|
||
|
'partner_ids': [],
|
||
|
'scheduled_date': False,
|
||
|
}
|
||
|
with self.env.cr.savepoint():
|
||
|
template.with_context(**context).send_mail(
|
||
|
self.id, force_send=True, raise_exception=True, email_values=email_values, email_layout_xmlid='mail.mail_notification_light'
|
||
|
)
|
||
|
|
||
|
def _totp_rate_limit(self, limit_type):
|
||
|
self.ensure_one()
|
||
|
assert request, "A request is required to be able to rate limit TOTP related actions"
|
||
|
limit, interval = TOTP_RATE_LIMITS.get(limit_type)
|
||
|
RateLimitLog = self.env['auth.totp.rate.limit.log'].sudo()
|
||
|
ip = request.httprequest.environ['REMOTE_ADDR']
|
||
|
domain = [
|
||
|
('user_id', '=', self.id),
|
||
|
('create_date', '>=', datetime.now() - timedelta(seconds=interval)),
|
||
|
('limit_type', '=', limit_type),
|
||
|
('ip', '=', ip),
|
||
|
]
|
||
|
count = RateLimitLog.search_count(domain)
|
||
|
if count >= limit:
|
||
|
descriptions = {
|
||
|
'send_email': _('You reached the limit of authentication mails sent for your account'),
|
||
|
'code_check': _('You reached the limit of code verifications for your account'),
|
||
|
}
|
||
|
description = descriptions.get(limit_type)
|
||
|
raise AccessDenied(description)
|
||
|
RateLimitLog.create({
|
||
|
'user_id': self.id,
|
||
|
'ip': ip,
|
||
|
'limit_type': limit_type,
|
||
|
})
|
||
|
|
||
|
def _totp_rate_limit_purge(self, limit_type):
|
||
|
self.ensure_one()
|
||
|
assert request, "A request is required to be able to rate limit TOTP related actions"
|
||
|
ip = request.httprequest.environ['REMOTE_ADDR']
|
||
|
RateLimitLog = self.env['auth.totp.rate.limit.log'].sudo()
|
||
|
RateLimitLog.search([
|
||
|
('user_id', '=', self.id),
|
||
|
('limit_type', '=', limit_type),
|
||
|
('ip', '=', ip),
|
||
|
]).unlink()
|