account_edi_proxy_client/models/account_edi_proxy_user.py

251 lines
12 KiB
Python
Raw Permalink Normal View History

from odoo import api, models, fields, _
from odoo.exceptions import UserError
from odoo.tools import index_exists
from .account_edi_proxy_auth import OdooEdiProxyAuth
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.fernet import Fernet
from psycopg2 import OperationalError
import requests
import uuid
import base64
import logging
_logger = logging.getLogger(__name__)
TIMEOUT = 30
class AccountEdiProxyError(Exception):
def __init__(self, code, message=False):
self.code = code
self.message = message
super().__init__(message or code)
class AccountEdiProxyClientUser(models.Model):
"""Represents a user of the proxy for an electronic invoicing format.
An edi_proxy_user has a unique identification on a specific format (for example, the vat for Peppol) which
allows to identify him when receiving a document addressed to him. It is linked to a specific company on a specific
Odoo database.
It also owns a key with which each file should be decrypted with (the proxy encrypt all the files with the public key).
"""
_name = 'account_edi_proxy_client.user'
_description = 'Account EDI proxy user'
active = fields.Boolean(default=True)
id_client = fields.Char(required=True)
company_id = fields.Many2one('res.company', string='Company', required=True,
default=lambda self: self.env.company)
edi_identification = fields.Char(required=True, help="The unique id that identifies this user, typically the vat")
private_key = fields.Binary(required=True, attachment=False, groups="base.group_system", help="The key to encrypt all the user's data")
private_key_filename = fields.Char(compute='_compute_private_key_filename')
refresh_token = fields.Char(groups="base.group_system")
proxy_type = fields.Selection(selection=[], required=True)
edi_mode = fields.Selection(
selection=[
('prod', 'Production mode'),
('test', 'Test mode'),
('demo', 'Demo mode'),
],
string='EDI operating mode',
)
_sql_constraints = [
('unique_id_client', 'unique(id_client)', 'This id_client is already used on another user.'),
('unique_active_edi_identification', '', 'This edi identification is already assigned to an active user'),
('unique_active_company_proxy', '', 'This company has an active user already created for this EDI type'),
]
def _auto_init(self):
super()._auto_init()
if not index_exists(self.env.cr, 'account_edi_proxy_client_user_unique_active_edi_identification'):
self.env.cr.execute("""
CREATE UNIQUE INDEX account_edi_proxy_client_user_unique_active_edi_identification
ON account_edi_proxy_client_user(edi_identification, proxy_type, edi_mode)
WHERE (active = True)
""")
if not index_exists(self.env.cr, 'account_edi_proxy_client_user_unique_active_company_proxy'):
self.env.cr.execute("""
CREATE UNIQUE INDEX account_edi_proxy_client_user_unique_active_company_proxy
ON account_edi_proxy_client_user(company_id, proxy_type, edi_mode)
WHERE (active = True)
""")
def _compute_private_key_filename(self):
for record in self:
record.private_key_filename = f'{record.id_client}_{record.edi_identification}.key'
def _get_proxy_urls(self):
# To extend
return {}
def _get_server_url(self, proxy_type=None, edi_mode=None):
proxy_type = proxy_type or self.proxy_type
edi_mode = edi_mode or self.edi_mode
proxy_urls = self._get_proxy_urls()
# letting this traceback in case of a KeyError, as that would mean something's wrong with the code
return proxy_urls[proxy_type][edi_mode]
def _get_proxy_users(self, company, proxy_type):
'''Returns proxy users associated with the given company and proxy type.
'''
return company.account_edi_proxy_client_ids.filtered(lambda u: u.proxy_type == proxy_type)
def _get_proxy_identification(self, company, proxy_type):
'''Returns the key that will identify company uniquely
within a specific proxy type and edi operating mode.
or raises a UserError (if the user didn't fill the related field).
TO OVERRIDE
'''
return False
def _make_request(self, url, params=False):
''' Make a request to proxy and handle the generic elements of the reponse (errors, new refresh token).
'''
payload = {
'jsonrpc': '2.0',
'method': 'call',
'params': params or {},
'id': uuid.uuid4().hex,
}
# Last barrier : in case the demo mode is not handled by the caller, we block access.
if self.edi_mode == 'demo':
raise AccountEdiProxyError("block_demo_mode", "Can't access the proxy in demo mode")
try:
response = requests.post(
url,
json=payload,
timeout=TIMEOUT,
headers={'content-type': 'application/json'},
auth=OdooEdiProxyAuth(user=self)).json()
except (ValueError, requests.exceptions.ConnectionError, requests.exceptions.MissingSchema, requests.exceptions.Timeout, requests.exceptions.HTTPError):
raise AccountEdiProxyError('connection_error',
_('The url that this service requested returned an error. The url it tried to contact was %s', url))
if 'error' in response:
message = _('The url that this service requested returned an error. The url it tried to contact was %s. %s', url, response['error']['message'])
if response['error']['code'] == 404:
message = _('The url that this service tried to contact does not exist. The url was %r', url)
raise AccountEdiProxyError('connection_error', message)
proxy_error = response['result'].pop('proxy_error', False)
if proxy_error:
error_code = proxy_error['code']
if error_code == 'refresh_token_expired':
self._renew_token()
self.env.cr.commit() # We do not want to lose it if in the _make_request below something goes wrong
return self._make_request(url, params)
if error_code == 'no_such_user':
# This error is also raised if the user didn't exchange data and someone else claimed the edi_identificaiton.
self.sudo().active = False
raise AccountEdiProxyError(error_code, proxy_error['message'] or False)
return response['result']
def _register_proxy_user(self, company, proxy_type, edi_mode):
''' Generate the public_key/private_key that will be used to encrypt the file, send a request to the proxy
to register the user with the public key and create the user with the private key.
:param company: the company of the user.
'''
# public_exponent=65537 is a default value that should be used most of the time, as per the documentation of cryptography.
# key_size=2048 is considered a reasonable default key size, as per the documentation of cryptography.
# see https://cryptography.io/en/latest/hazmat/primitives/asymmetric/rsa/
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
private_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
public_key = private_key.public_key()
public_pem = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
edi_identification = self._get_proxy_identification(company, proxy_type)
if edi_mode == 'demo':
# simulate registration
response = {'id_client': f'demo{company.id}', 'refresh_token': 'demo'}
else:
try:
# b64encode returns a bytestring, we need it as a string
response = self._make_request(self._get_server_url(proxy_type, edi_mode) + '/iap/account_edi/2/create_user', params={
'dbuuid': company.env['ir.config_parameter'].get_param('database.uuid'),
'company_id': company.id,
'edi_identification': edi_identification,
'public_key': base64.b64encode(public_pem).decode(),
'proxy_type': proxy_type,
})
except AccountEdiProxyError as e:
raise UserError(e.message)
if 'error' in response:
raise UserError(response['error'])
return self.create({
'id_client': response['id_client'],
'company_id': company.id,
'proxy_type': proxy_type,
'edi_mode': edi_mode,
'edi_identification': edi_identification,
'private_key': base64.b64encode(private_pem),
'refresh_token': response['refresh_token'],
})
def _renew_token(self):
''' Request the proxy for a new refresh token.
Request to the proxy should be made with a refresh token that expire after 24h to avoid
that multiple database use the same credentials. When receiving an error for an expired refresh_token,
This method makes a request to get a new refresh token.
'''
try:
with self.env.cr.savepoint(flush=False):
self.env.cr.execute('SELECT * FROM account_edi_proxy_client_user WHERE id IN %s FOR UPDATE NOWAIT', [tuple(self.ids)])
except OperationalError as e:
if e.pgcode == '55P03':
return
raise e
response = self._make_request(self._get_server_url() + '/iap/account_edi/1/renew_token')
if 'error' in response:
# can happen if the database was duplicated and the refresh_token was refreshed by the other database.
# we don't want two database to be able to query the proxy with the same user
# because it could lead to not inconsistent data.
_logger.error(response['error'])
self.sudo().refresh_token = response['refresh_token']
def _decrypt_data(self, data, symmetric_key):
''' Decrypt the data. Note that the data is encrypted with a symmetric key, which is encrypted with an asymmetric key.
We must therefore decrypt the symmetric key.
:param data: The data to decrypt.
:param symmetric_key: The symmetric_key encrypted with self.private_key.public_key()
'''
private_key = serialization.load_pem_private_key(
base64.b64decode(self.sudo().private_key),
password=None,
backend=default_backend()
)
key = private_key.decrypt(
base64.b64decode(symmetric_key),
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
f = Fernet(key)
return f.decrypt(base64.b64decode(data))