google_gmail/models/google_gmail_mixin.py

180 lines
7.1 KiB
Python
Raw Permalink Normal View History

# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
import json
import logging
import time
import requests
from werkzeug.urls import url_encode, url_join
from odoo import _, api, fields, models, tools
from odoo.exceptions import AccessError, UserError
_logger = logging.getLogger(__name__)
class GoogleGmailMixin(models.AbstractModel):
_name = 'google.gmail.mixin'
_description = 'Google Gmail Mixin'
_SERVICE_SCOPE = 'https://mail.google.com/'
google_gmail_authorization_code = fields.Char(string='Authorization Code', groups='base.group_system', copy=False)
google_gmail_refresh_token = fields.Char(string='Refresh Token', groups='base.group_system', copy=False)
google_gmail_access_token = fields.Char(string='Access Token', groups='base.group_system', copy=False)
google_gmail_access_token_expiration = fields.Integer(string='Access Token Expiration Timestamp', groups='base.group_system', copy=False)
google_gmail_uri = fields.Char(compute='_compute_gmail_uri', string='URI', help='The URL to generate the authorization code from Google', groups='base.group_system')
@api.depends('google_gmail_authorization_code')
def _compute_gmail_uri(self):
Config = self.env['ir.config_parameter'].sudo()
google_gmail_client_id = Config.get_param('google_gmail_client_id')
google_gmail_client_secret = Config.get_param('google_gmail_client_secret')
base_url = self.get_base_url()
redirect_uri = url_join(base_url, '/google_gmail/confirm')
if not google_gmail_client_id or not google_gmail_client_secret:
self.google_gmail_uri = False
else:
for record in self:
google_gmail_uri = 'https://accounts.google.com/o/oauth2/v2/auth?%s' % url_encode({
'client_id': google_gmail_client_id,
'redirect_uri': redirect_uri,
'response_type': 'code',
'scope': self._SERVICE_SCOPE,
# access_type and prompt needed to get a refresh token
'access_type': 'offline',
'prompt': 'consent',
'state': json.dumps({
'model': record._name,
'id': record.id or False,
'csrf_token': record._get_gmail_csrf_token() if record.id else False,
})
})
record.google_gmail_uri = google_gmail_uri
def open_google_gmail_uri(self):
"""Open the URL to accept the Gmail permission.
This is done with an action, so we can force the user the save the form.
We need him to save the form so the current mail server record exist in DB, and
we can include the record ID in the URL.
"""
self.ensure_one()
if not self.env.user.has_group('base.group_system'):
raise AccessError(_('Only the administrator can link a Gmail mail server.'))
if not self.google_gmail_uri:
raise UserError(_('Please configure your Gmail credentials.'))
return {
'type': 'ir.actions.act_url',
'url': self.google_gmail_uri,
}
def _fetch_gmail_refresh_token(self, authorization_code):
"""Request the refresh token and the initial access token from the authorization code.
:return:
refresh_token, access_token, access_token_expiration
"""
response = self._fetch_gmail_token('authorization_code', code=authorization_code)
return (
response['refresh_token'],
response['access_token'],
int(time.time()) + response['expires_in'],
)
def _fetch_gmail_access_token(self, refresh_token):
"""Refresh the access token thanks to the refresh token.
:return:
access_token, access_token_expiration
"""
response = self._fetch_gmail_token('refresh_token', refresh_token=refresh_token)
return (
response['access_token'],
int(time.time()) + response['expires_in'],
)
def _fetch_gmail_token(self, grant_type, **values):
"""Generic method to request an access token or a refresh token.
Return the JSON response of the GMail API and manage the errors which can occur.
:param grant_type: Depends the action we want to do (refresh_token or authorization_code)
:param values: Additional parameters that will be given to the GMail endpoint
"""
Config = self.env['ir.config_parameter'].sudo()
google_gmail_client_id = Config.get_param('google_gmail_client_id')
google_gmail_client_secret = Config.get_param('google_gmail_client_secret')
base_url = self.get_base_url()
redirect_uri = url_join(base_url, '/google_gmail/confirm')
response = requests.post(
'https://oauth2.googleapis.com/token',
data={
'client_id': google_gmail_client_id,
'client_secret': google_gmail_client_secret,
'grant_type': grant_type,
'redirect_uri': redirect_uri,
**values,
},
timeout=5,
)
if not response.ok:
raise UserError(_('An error occurred when fetching the access token.'))
return response.json()
def _generate_oauth2_string(self, user, refresh_token):
"""Generate a OAuth2 string which can be used for authentication.
:param user: Email address of the Gmail account to authenticate
:param refresh_token: Refresh token for the given Gmail account
:return: The SASL argument for the OAuth2 mechanism.
"""
self.ensure_one()
now_timestamp = int(time.time())
if not self.google_gmail_access_token \
or not self.google_gmail_access_token_expiration \
or self.google_gmail_access_token_expiration < now_timestamp:
access_token, expiration = self._fetch_gmail_access_token(self.google_gmail_refresh_token)
self.write({
'google_gmail_access_token': access_token,
'google_gmail_access_token_expiration': expiration,
})
_logger.info(
'Google Gmail: fetch new access token. Expires in %i minutes',
(self.google_gmail_access_token_expiration - now_timestamp) // 60)
else:
_logger.info(
'Google Gmail: reuse existing access token. Expire in %i minutes',
(self.google_gmail_access_token_expiration - now_timestamp) // 60)
return 'user=%s\1auth=Bearer %s\1\1' % (user, self.google_gmail_access_token)
def _get_gmail_csrf_token(self):
"""Generate a CSRF token that will be verified in `google_gmail_callback`.
This will prevent a malicious person to make an admin user disconnect the mail servers.
"""
self.ensure_one()
_logger.info('Google Gmail: generate CSRF token for %s #%i', self._name, self.id)
return tools.misc.hmac(
env=self.env(su=True),
scope='google_gmail_oauth',
message=(self._name, self.id),
)