mail/web_push.py
Данил Воробьев 6e6f15d803 initial commit
2024-05-03 09:40:35 +00:00

150 lines
5.8 KiB
Python

# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
import json
import logging as logger
import os
import struct
import textwrap
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
from urllib.parse import urlsplit
from .tools import jwt
MAX_PAYLOAD_SIZE = 4096
_logger = logger.getLogger(__name__)
def _iv(base, counter):
mask = int.from_bytes(base[4:], 'big')
return base[:4] + (counter ^ mask).to_bytes(8, 'big')
def _derive_key(salt, private_key, device):
# browser keys
device_keys = json.loads(device["keys"])
p256dh = jwt.base64_decode_with_padding(device_keys.get('p256dh'))
auth = jwt.base64_decode_with_padding(device_keys.get('auth'))
# generate a public key derived from the browser public key
pub_key = ec.EllipticCurvePublicKey.from_encoded_point(ec.SECP256R1(), p256dh)
sender_pub_key = private_key.public_key().public_bytes(
Encoding.X962, PublicFormat.UncompressedPoint
)
context = b"WebPush: info\x00" + p256dh + sender_pub_key
key_info = b"Content-Encoding: aes128gcm\x00"
nonce_info = b"Content-Encoding: nonce\x00"
# Create the 3 HKDF keys needed to encrypt the message (auth, key, nonce)
hkdf_auth = HKDF(
algorithm=hashes.SHA256(),
length=32,
salt=auth,
info=context,
backend=default_backend(),
)
hkdf_key = HKDF(
algorithm=hashes.SHA256(),
length=16,
salt=salt,
info=key_info,
backend=default_backend(),
)
hkdf_nonce = HKDF(
algorithm=hashes.SHA256(),
length=12,
salt=salt,
info=nonce_info,
backend=default_backend(),
)
secret = hkdf_auth.derive(private_key.exchange(ec.ECDH(), pub_key))
return hkdf_key.derive(secret), hkdf_nonce.derive(secret)
def _encrypt_payload(content, device, record_size=MAX_PAYLOAD_SIZE):
"""
Encrypt a payload for Push Notification Endpoint using AES128GCM
https://www.rfc-editor.org/rfc/rfc7516
https://www.rfc-editor.org/rfc/rfc8188
:param content: the unencrypted payload
:param device: the web push user browser information
:param record_size: record size must be bigger than 18
:return: the encrypted payload
"""
# The private_key is an ephemeral ECDH key used only for a transaction
private_key = ec.generate_private_key(ec.SECP256R1(), default_backend())
salt = os.urandom(16)
# generate key
(key, nonce) = _derive_key(salt=salt, private_key=private_key, device=device)
# AEAD_AES_128_GCM produces ciphertext 16 octets longer than its input plaintext.
# Therefore, the unencrypted content of each record is shorter than the record size by 16 octets.
# Valid records always contain at least a padding delimiter octet and a 16-octet authentication tag.
overhead = 1 + 16
chunk_size = record_size - overhead
body = b""
end = len(content)
aesgcm = AESGCM(key)
for i in range(0, end, chunk_size):
padding = b"\x02" if (i + chunk_size) >= end else b"\x01"
body += aesgcm.encrypt(nonce, content[i: i + chunk_size] + padding, None)
sender_public_key = private_key.public_key().public_bytes(
Encoding.X962, PublicFormat.UncompressedPoint
)
# +-----------+-----------------+---------------------------+-------------------------------------------+
# | salt (16) | record_size (4) | sender_public_key.len (1) | sender_public_key (sender_public_key.len) |
# +-----------+-----------------+---------------------------+-------------------------------------------+
header = struct.pack("!16sLB", salt, record_size, len(sender_public_key))
header += sender_public_key
return header + body
def push_to_end_point(base_url, device, payload, vapid_private_key, vapid_public_key, session):
"""
https://www.rfc-editor.org/rfc/rfc8291
"""
endpoint = device["endpoint"]
url = urlsplit(endpoint)
jwt_claims = {
# aud: The “Audience” is a JWT construct that indicates the recipient scheme and host
# e.g. for an endpoint like https://updates.push.services.mozilla.com/wpush/v2/gAAAAABY...,
# the “aud” would be https://updates.push.services.mozilla.com
'aud': '{}://{}'.format(url.scheme, url.netloc),
# sub: the sub value needs to be either a URL address. This is so that if a push service needed to reach out
# to sender, it can find contact information from the JWT.
'sub': base_url,
}
token = jwt.sign(jwt_claims, vapid_private_key, ttl=12 * 60 * 60, algorithm=jwt.Algorithm.ES256)
body_payload = payload.encode()
payload = _encrypt_payload(body_payload, device)
headers = {
# Authorization header field contains these parameters:
# - "t" is the JWT;
# - "k" the base64url-encoded key that signed that token.
'Authorization': 'vapid t={}, k={}'.format(token, vapid_public_key),
'Content-Encoding': 'aes128gcm',
'TTL': '0',
}
response = session.post(endpoint, headers=headers, data=payload, timeout=5)
if response.status_code == 201:
_logger.debug('Sent push notification %s', endpoint)
else:
error_message_shorten = textwrap.shorten(response.text, 100)
_logger.warning('Failed push notification %s %d - %s',
endpoint, response.status_code, error_message_shorten)
# Invalid subscription
if response.status_code == 404 or response.status_code == 410:
raise DeviceUnreachableError("Device Unreachable")
class DeviceUnreachableError(Exception):
pass