151 lines
5.4 KiB
Python

# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
import babel.dates
import logging
from datetime import datetime, timedelta
from odoo import _, models
from odoo.exceptions import AccessDenied, UserError
from odoo.http import request
from odoo.tools.misc import babel_locale_parse, hmac
from odoo.addons.auth_totp.models.totp import hotp, TOTP
_logger = logging.getLogger(__name__)
TOTP_RATE_LIMITS = {
'send_email': (10, 3600),
'code_check': (10, 3600),
}
class Users(models.Model):
_inherit = 'res.users'
def _mfa_type(self):
r = super()._mfa_type()
if r is not None:
return r
ICP = self.env['ir.config_parameter'].sudo()
otp_required = False
if ICP.get_param('auth_totp.policy') == 'all_required':
otp_required = True
elif ICP.get_param('auth_totp.policy') == 'employee_required' and self._is_internal():
otp_required = True
if otp_required:
return 'totp_mail'
def _mfa_url(self):
r = super()._mfa_url()
if r is not None:
return r
if self._mfa_type() == 'totp_mail':
return '/web/login/totp'
def _totp_check(self, code):
self._totp_rate_limit('code_check')
user = self.sudo()
if user._mfa_type() != 'totp_mail':
return super()._totp_check(code)
key = user._get_totp_mail_key()
match = TOTP(key).match(code, window=3600, timestep=3600)
if match is None:
_logger.info("2FA check (mail): FAIL for %s %r", user, user.login)
raise AccessDenied(_("Verification failed, please double-check the 6-digit code"))
_logger.info("2FA check(mail): SUCCESS for %s %r", user, user.login)
self._totp_rate_limit_purge('code_check')
self._totp_rate_limit_purge('send_email')
return True
def _get_totp_mail_key(self):
self.ensure_one()
return hmac(self.env(su=True), 'auth_totp_mail-code', (self.id, self.login, self.login_date)).encode()
def _get_totp_mail_code(self):
self.ensure_one()
key = self._get_totp_mail_key()
now = datetime.now()
counter = int(datetime.timestamp(now) / 3600)
code = hotp(key, counter)
expiration = timedelta(seconds=3600)
lang = babel_locale_parse(self.env.context.get('lang') or self.lang)
expiration = babel.dates.format_timedelta(expiration, locale=lang)
return str(code).zfill(6), expiration
def _send_totp_mail_code(self):
self.ensure_one()
self._totp_rate_limit('send_email')
if not self.email:
raise UserError(_("Cannot send email: user %s has no email address.", self.name))
template = self.env.ref('auth_totp_mail_enforce.mail_template_totp_mail_code').sudo()
context = {}
if request:
device = request.httprequest.user_agent.platform
browser = request.httprequest.user_agent.browser
context.update({
'location': None,
'device': device and device.capitalize() or None,
'browser': browser and browser.capitalize() or None,
'ip': request.httprequest.environ['REMOTE_ADDR'],
})
if request.geoip.city.name:
context['location'] = f"{request.geoip.city.name}, {request.geoip.country_name}"
email_values = {
'email_to': self.email,
'email_cc': False,
'auto_delete': True,
'recipient_ids': [],
'partner_ids': [],
'scheduled_date': False,
}
with self.env.cr.savepoint():
template.with_context(**context).send_mail(
self.id, force_send=True, raise_exception=True, email_values=email_values, email_layout_xmlid='mail.mail_notification_light'
)
def _totp_rate_limit(self, limit_type):
self.ensure_one()
assert request, "A request is required to be able to rate limit TOTP related actions"
limit, interval = TOTP_RATE_LIMITS.get(limit_type)
RateLimitLog = self.env['auth.totp.rate.limit.log'].sudo()
ip = request.httprequest.environ['REMOTE_ADDR']
domain = [
('user_id', '=', self.id),
('create_date', '>=', datetime.now() - timedelta(seconds=interval)),
('limit_type', '=', limit_type),
('ip', '=', ip),
]
count = RateLimitLog.search_count(domain)
if count >= limit:
descriptions = {
'send_email': _('You reached the limit of authentication mails sent for your account'),
'code_check': _('You reached the limit of code verifications for your account'),
}
description = descriptions.get(limit_type)
raise AccessDenied(description)
RateLimitLog.create({
'user_id': self.id,
'ip': ip,
'limit_type': limit_type,
})
def _totp_rate_limit_purge(self, limit_type):
self.ensure_one()
assert request, "A request is required to be able to rate limit TOTP related actions"
ip = request.httprequest.environ['REMOTE_ADDR']
RateLimitLog = self.env['auth.totp.rate.limit.log'].sudo()
RateLimitLog.search([
('user_id', '=', self.id),
('limit_type', '=', limit_type),
('ip', '=', ip),
]).unlink()