mass_mailing/tests/common.py

370 lines
16 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
import datetime
import random
import re
import werkzeug
from unittest.mock import patch
from odoo import tools
from odoo.addons.link_tracker.tests.common import MockLinkTracker
from odoo.addons.mail.tests.common import MailCase, MailCommon, mail_new_test_user
from odoo.sql_db import Cursor
class MassMailCase(MailCase, MockLinkTracker):
# ------------------------------------------------------------
# ASSERTS
# ------------------------------------------------------------
def assertMailingStatistics(self, mailing, **kwargs):
""" Helper to assert mailing statistics fields. As we have many of them
it helps lessening test asserts. """
if not kwargs.get('expected'):
kwargs['expected'] = len(mailing.mailing_trace_ids)
if not kwargs.get('delivered'):
kwargs['delivered'] = len(mailing.mailing_trace_ids)
for fname in ['scheduled', 'expected', 'sent', 'delivered',
'opened', 'replied', 'clicked',
'canceled', 'failed', 'bounced']:
self.assertEqual(
mailing[fname], kwargs.get(fname, 0),
'Mailing %s statistics failed: got %s instead of %s' % (fname, mailing[fname], kwargs.get(fname, 0))
)
def assertMailTraces(self, recipients_info, mailing, records,
check_mail=True, sent_unlink=False,
author=None, mail_links_info=None):
""" Check content of traces. Traces are fetched based on a given mailing
and records. Their content is compared to recipients_info structure that
holds expected information. Links content may be checked, notably to
assert shortening or unsubscribe links. Mail.mail records may optionally
be checked.
:param recipients_info: list[{
# TRACE
'partner': res.partner record (may be empty),
'email': email used when sending email (may be empty, computed based on partner),
'trace_status': outgoing / sent / open / reply / bounce / error / cancel (sent by default),
'record: linked record,
# MAIL.MAIL
'content': optional content that should be present in mail.mail body_html;
'email_to_mail': optional email used for the mail, when different from the
one stored on the trace itself;
'email_to_recipients': optional, see '_assertMailMail';
'failure_type': optional failure reason;
}, { ... }]
:param mailing: a mailing.mailing record from which traces have been
generated;
:param records: records given to mailing that generated traces. It is
used notably to find traces using their IDs;
:param check_mail: if True, also check mail.mail records that should be
linked to traces;
:param sent_unlink: it True, sent mail.mail are deleted and we check gateway
output result instead of actual mail.mail records;
:param mail_links_info: if given, should follow order of ``recipients_info``
and give details about links. See ``assertLinkShortenedHtml`` helper for
more details about content to give;
:param author: author of sent mail.mail;
"""
# map trace state to email state
state_mapping = {
'sent': 'sent',
'open': 'sent', # opened implies something has been sent
'reply': 'sent', # replied implies something has been sent
'error': 'exception',
'cancel': 'cancel',
'bounce': 'cancel',
}
traces = self.env['mailing.trace'].search([
('mass_mailing_id', 'in', mailing.ids),
('res_id', 'in', records.ids)
])
debug_info = '\n'.join(
(
f'Trace: to {t.email} - state {t.trace_status} - res_id {t.res_id}'
for t in traces
)
)
# ensure trace coherency
self.assertTrue(all(s.model == records._name for s in traces))
self.assertEqual(set(s.res_id for s in traces), set(records.ids))
# check each traces
if not mail_links_info:
mail_links_info = [None] * len(recipients_info)
for recipient_info, link_info, record in zip(recipients_info, mail_links_info, records):
partner = recipient_info.get('partner', self.env['res.partner'])
email = recipient_info.get('email')
email_to_mail = recipient_info.get('email_to_mail') or email
email_to_recipients = recipient_info.get('email_to_recipients')
status = recipient_info.get('trace_status', 'sent')
record = record or recipient_info.get('record')
content = recipient_info.get('content')
if email is None and partner:
email = partner.email_normalized
recipient_trace = traces.filtered(
lambda t: (t.email == email or (not email and not t.email)) and \
t.trace_status == status and \
(t.res_id == record.id if record else True)
)
self.assertTrue(
len(recipient_trace) == 1,
'MailTrace: email %s (recipient %s, status: %s, record: %s): found %s records (1 expected)\n%s' % (
email, partner, status, record,
len(recipient_trace), debug_info)
)
self.assertTrue(bool(recipient_trace.mail_mail_id_int))
if 'failure_type' in recipient_info or status in ('error', 'cancel', 'bounce'):
self.assertEqual(recipient_trace.failure_type, recipient_info['failure_type'])
if 'failure_reason' in recipient_info:
self.assertEqual(recipient_trace.failure_reason, recipient_info['failure_reason'])
if check_mail:
if author is None:
author = self.env.user.partner_id
# mail.mail specific values to check
fields_values = {'mailing_id': mailing}
if recipient_info.get('mail_values'):
fields_values.update(recipient_info['mail_values'])
if 'failure_reason' in recipient_info:
fields_values['failure_reason'] = recipient_info['failure_reason']
if 'email_to_mail' in recipient_info:
fields_values['email_to'] = recipient_info['email_to_mail']
# specific for partner: email_formatted is used
if partner:
if status == 'sent' and sent_unlink:
self.assertSentEmail(author, [partner])
else:
self.assertMailMail(
partner, state_mapping[status],
author=author,
content=content,
email_to_recipients=email_to_recipients,
fields_values=fields_values,
)
# specific if email is False -> could have troubles finding it if several falsy traces
elif not email and status in ('cancel', 'bounce'):
self.assertMailMailWId(
recipient_trace.mail_mail_id_int, state_mapping[status],
author=author,
content=content,
email_to_recipients=email_to_recipients,
fields_values=fields_values,
)
else:
self.assertMailMailWEmails(
[email_to_mail], state_mapping[status],
author=author,
content=content,
email_to_recipients=email_to_recipients,
fields_values=fields_values,
)
if link_info:
trace_mail = self._find_mail_mail_wrecord(record)
for (anchor_id, url, is_shortened, add_link_params) in link_info:
link_params = {'utm_medium': 'Email', 'utm_source': mailing.name}
if add_link_params:
link_params.update(**add_link_params)
self.assertLinkShortenedHtml(
trace_mail.body_html,
(anchor_id, url, is_shortened),
link_params=link_params,
)
# ------------------------------------------------------------
# TOOLS
# ------------------------------------------------------------
def gateway_mail_bounce(self, mailing, record, bounce_base_values=None):
""" Generate a bounce at mailgateway level.
:param mailing: a ``mailing.mailing`` record on which we find a trace
to bounce;
:param record: record which should bounce;
:param bounce_base_values: optional values given to routing;
"""
trace = mailing.mailing_trace_ids.filtered(
lambda t: t.model == record._name and t.res_id == record.id
)
parsed_bounce_values = {
'email_from': 'some.email@external.example.com', # TDE check: email_from -> trace email ?
'to': 'bounce@test.example.com', # TDE check: bounce alias ?
'message_id': tools.generate_tracking_message_id('MailTest'),
'bounced_partner': self.env['res.partner'].sudo(),
'bounced_message': self.env['mail.message'].sudo(),
'body': 'This is the bounce email',
}
if bounce_base_values:
parsed_bounce_values.update(bounce_base_values)
parsed_bounce_values.update({
'bounced_email': trace.email,
'bounced_msg_ids': [trace.message_id],
})
self.env['mail.thread']._routing_handle_bounce(False, parsed_bounce_values)
return trace
def gateway_mail_click(self, mailing, record, click_label):
""" Simulate a click on a sent email.
:param mailing: a ``mailing.mailing`` record on which we find a trace
to click;
:param record: record which should click;
:param click_label: label of link on which we should click;
"""
trace = mailing.mailing_trace_ids.filtered(
lambda t: t.model == record._name and t.res_id == record.id
)
email = self._find_sent_mail_wemail(trace.email)
self.assertTrue(bool(email))
for (_url_href, link_url, _dummy, label) in re.findall(tools.HTML_TAG_URL_REGEX, email['body']):
if label == click_label and '/r/' in link_url: # shortened link, like 'http://localhost:8069/r/LBG/m/53'
parsed_url = werkzeug.urls.url_parse(link_url)
path_items = parsed_url.path.split('/')
code, trace_id = path_items[2], int(path_items[4])
self.assertEqual(trace.id, trace_id)
self.env['link.tracker.click'].sudo().add_click(
code,
ip='100.200.300.%3f' % random.random(),
country_code='BE',
mailing_trace_id=trace.id
)
break
else:
raise AssertionError('url %s not found in mailing %s for record %s' % (click_label, mailing, record))
return trace
def gateway_mail_open(self, mailing, record):
""" Simulate opening an email through blank.gif icon access. As we
don't want to use the whole Http layer just for that we will just
call 'set_opened()' on trace, until having a better option.
:param mailing: a ``mailing.mailing`` record on which we find a trace
to open;
:param record: record which should open;
"""
trace = mailing.mailing_trace_ids.filtered(
lambda t: t.model == record._name and t.res_id == record.id
)
trace.set_opened()
return trace
@classmethod
def _create_bounce_trace(cls, mailing, records, dt=None):
if dt is None:
dt = datetime.datetime.now() - datetime.timedelta(days=1)
return cls._create_traces(mailing, records, dt, trace_status='bounce')
@classmethod
def _create_sent_traces(cls, mailing, records, dt=None):
if dt is None:
dt = datetime.datetime.now() - datetime.timedelta(days=1)
return cls._create_traces(mailing, records, dt, trace_status='sent')
@classmethod
def _create_traces(cls, mailing, records, dt, **values):
if 'email_normalized' in records:
fname = 'email_normalized'
elif 'email_from' in records:
fname = 'email_from'
else:
fname = 'email'
randomized = random.random()
# Cursor.now() uses transaction's timestamp and not datetime lib -> freeze_time
# is not sufficient
with patch.object(Cursor, 'now', lambda *args, **kwargs: dt):
traces = cls.env['mailing.trace'].sudo().create([
dict({'mass_mailing_id': mailing.id,
'model': record._name,
'res_id': record.id,
'trace_status': values.get('trace_status', 'bounce'),
# TDE FIXME: improve this with a mail-enabled heuristics
'email': record[fname],
'message_id': '<%5f@gilbert.boitempomils>' % randomized,
}, **values)
for record in records
])
return traces
@classmethod
def _create_mailing_list(cls):
""" Shortcut to create mailing lists. Currently hardcoded, maybe evolve
in a near future. """
cls.mailing_list_1, cls.mailing_list_2, cls.mailing_list_3, cls.mailing_list_4 = cls.env['mailing.list'].with_context(cls._test_context).create([
{
'contact_ids': [
(0, 0, {'name': 'Déboulonneur', 'email': 'fleurus@example.com'}),
(0, 0, {'name': 'Gorramts', 'email': 'gorramts@example.com'}),
(0, 0, {'name': 'Ybrant', 'email': 'ybrant@example.com'}),
],
'name': 'List1',
'is_public': True,
}, {
'contact_ids': [
(0, 0, {'name': 'Gilberte', 'email': 'gilberte@example.com'}),
(0, 0, {'name': 'Gilberte En Mieux', 'email': 'gilberte@example.com'}),
(0, 0, {'name': 'Norbert', 'email': 'norbert@example.com'}),
(0, 0, {'name': 'Ybrant', 'email': 'ybrant@example.com'}),
],
'name': 'List2',
'is_public': True,
}, {
'contact_ids': [
(0, 0, {'name': 'Déboulonneur', 'email': 'fleurus@example.com'}),
],
'name': 'List3',
'is_public': True,
}, {
'name': 'List4',
}
])
cls.mailing_list_3.subscription_ids[0].opt_out = True
@classmethod
def _create_mailing_list_of_x_contacts(cls, contacts_nbr):
""" Shortcut to create a mailing list that contains a defined number
of contacts. """
return cls.env['mailing.list'].with_context(cls._test_context).create({
'name': 'Test List',
'contact_ids': [
(0, 0, {
'name': f'Contact %{idx}',
'email': f'contact%{idx}@example.com'
})
for idx in range(contacts_nbr)
],
})
class MassMailCommon(MailCommon, MassMailCase):
@classmethod
def setUpClass(cls):
super(MassMailCommon, cls).setUpClass()
cls.user_marketing = mail_new_test_user(
cls.env,
groups='base.group_user,base.group_partner_manager,mass_mailing.group_mass_mailing_user',
login='user_marketing',
name='Martial Marketing',
signature='--\nMartial',
)
cls.email_reply_to = 'MyCompany SomehowAlias <test.alias@test.mycompany.com>'
cls.env.flush_all()