link_tracker/models/link_tracker.py

304 lines
12 KiB
Python
Raw Permalink Normal View History

# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
import random
import requests
import string
from lxml import html
from werkzeug import urls
from odoo import tools, models, fields, api, _
from odoo.exceptions import UserError
from odoo.osv import expression
from odoo.addons.mail.tools import link_preview
class LinkTracker(models.Model):
""" Link trackers allow users to wrap any URL into a short URL that can be
tracked by Odoo. Clicks are counter on each link. A tracker is linked to
UTMs allowing to analyze marketing actions.
This model is also used in mass_mailing where each link in html body is
automatically converted into a short link that is tracked and integrates
UTMs. """
_name = "link.tracker"
_rec_name = "short_url"
_description = "Link Tracker"
_order="count DESC"
_inherit = ["utm.mixin"]
# URL info
url = fields.Char(string='Target URL', required=True)
absolute_url = fields.Char("Absolute URL", compute="_compute_absolute_url")
short_url = fields.Char(string='Tracked URL', compute='_compute_short_url')
redirected_url = fields.Char(string='Redirected URL', compute='_compute_redirected_url')
short_url_host = fields.Char(string='Host of the short URL', compute='_compute_short_url_host')
title = fields.Char(string='Page Title', store=True)
label = fields.Char(string='Button label')
# Tracking
link_code_ids = fields.One2many('link.tracker.code', 'link_id', string='Codes')
code = fields.Char(string='Short URL code', compute='_compute_code')
link_click_ids = fields.One2many('link.tracker.click', 'link_id', string='Clicks')
count = fields.Integer(string='Number of Clicks', compute='_compute_count', store=True)
# UTMs - enforcing the fact that we want to 'set null' when relation is unlinked
campaign_id = fields.Many2one(ondelete='set null')
medium_id = fields.Many2one(ondelete='set null')
source_id = fields.Many2one(ondelete='set null')
@api.depends("url")
def _compute_absolute_url(self):
for tracker in self:
url = urls.url_parse(tracker.url)
if url.scheme:
tracker.absolute_url = tracker.url
else:
tracker.absolute_url = tracker.get_base_url().join(url).to_url()
@api.depends('link_click_ids.link_id')
def _compute_count(self):
clicks_data = self.env['link.tracker.click']._read_group(
[('link_id', 'in', self.ids)],
['link_id'],
['__count'],
)
mapped_data = {link.id: count for link, count in clicks_data}
for tracker in self:
tracker.count = mapped_data.get(tracker.id, 0)
@api.depends('code')
def _compute_short_url(self):
for tracker in self:
tracker.short_url = urls.url_join(tracker.short_url_host, '%(code)s' % {'code': tracker.code})
def _compute_short_url_host(self):
for tracker in self:
tracker.short_url_host = tracker.get_base_url() + '/r/'
def _compute_code(self):
for tracker in self:
record = self.env['link.tracker.code'].search([('link_id', '=', tracker.id)], limit=1, order='id DESC')
tracker.code = record.code
@api.depends('url')
def _compute_redirected_url(self):
"""Compute the URL to which we will redirect the user.
By default, add UTM values as GET parameters. But if the system parameter
`link_tracker.no_external_tracking` is set, we add the UTM values in the URL
*only* for URLs that redirect to the local website (base URL).
"""
no_external_tracking = self.env['ir.config_parameter'].sudo().get_param('link_tracker.no_external_tracking')
for tracker in self:
base_domain = urls.url_parse(tracker.get_base_url()).netloc
parsed = urls.url_parse(tracker.url)
if no_external_tracking and parsed.netloc and parsed.netloc != base_domain:
tracker.redirected_url = parsed.to_url()
continue
utms = {}
for key, field_name, cook in self.env['utm.mixin'].tracking_fields():
field = self._fields[field_name]
attr = tracker[field_name]
if field.type == 'many2one':
attr = attr.name
if attr:
utms[key] = attr
utms.update(parsed.decode_query())
tracker.redirected_url = parsed.replace(query=urls.url_encode(utms)).to_url()
@api.model
@api.depends('url')
def _get_title_from_url(self, url):
preview = link_preview.get_link_preview_from_url(url)
if preview and preview.get('og_title'):
return preview['og_title']
return url
@api.constrains('url', 'campaign_id', 'medium_id', 'source_id')
def _check_unicity(self):
"""Check that the link trackers are unique."""
# build a query to fetch all needed link trackers at once
search_query = expression.OR([
expression.AND([
[('url', '=', tracker.url)],
[('campaign_id', '=', tracker.campaign_id.id)],
[('medium_id', '=', tracker.medium_id.id)],
[('source_id', '=', tracker.source_id.id)],
])
for tracker in self
])
# Can not be implemented with a SQL constraint because we want to care about null values.
all_link_trackers = self.search(search_query)
# check for unicity
for tracker in self:
if all_link_trackers.filtered(
lambda l: l.url == tracker.url
and l.campaign_id == tracker.campaign_id
and l.medium_id == tracker.medium_id
and l.source_id == tracker.source_id
) != tracker:
raise UserError(_(
'Link Tracker values (URL, campaign, medium and source) must be unique (%s, %s, %s, %s).',
tracker.url,
tracker.campaign_id.name,
tracker.medium_id.name,
tracker.source_id.name,
))
@api.model_create_multi
def create(self, vals_list):
vals_list = [vals.copy() for vals in vals_list]
for vals in vals_list:
if 'url' not in vals:
raise ValueError(_('Creating a Link Tracker without URL is not possible'))
if vals['url'].startswith(('?', '#')):
raise UserError(_("%r is not a valid link, links cannot redirect to the current page.", vals['url']))
vals['url'] = tools.validate_url(vals['url'])
if not vals.get('title'):
vals['title'] = self._get_title_from_url(vals['url'])
# Prevent the UTMs to be set by the values of UTM cookies
for (__, fname, __) in self.env['utm.mixin'].tracking_fields():
if fname not in vals:
vals[fname] = False
links = super(LinkTracker, self).create(vals_list)
link_tracker_codes = self.env['link.tracker.code']._get_random_code_strings(len(vals_list))
self.env['link.tracker.code'].sudo().create([
{
'code': code,
'link_id': link.id,
} for link, code in zip(links, link_tracker_codes)
])
return links
@api.model
def search_or_create(self, vals):
if 'url' not in vals:
raise ValueError(_('Creating a Link Tracker without URL is not possible'))
if vals['url'].startswith(('?', '#')):
raise UserError(_("%r is not a valid link, links cannot redirect to the current page.", vals['url']))
vals['url'] = tools.validate_url(vals['url'])
search_domain = [
(fname, '=', value)
for fname, value in vals.items()
if fname in ['url', 'campaign_id', 'medium_id', 'source_id']
]
result = self.search(search_domain, limit=1)
if result:
return result
return self.create(vals)
@api.model
def convert_links(self, html, vals, blacklist=None):
raise NotImplementedError('Moved on mail.render.mixin')
def _convert_links_text(self, body, vals, blacklist=None):
raise NotImplementedError('Moved on mail.render.mixin')
def action_view_statistics(self):
action = self.env['ir.actions.act_window']._for_xml_id('link_tracker.link_tracker_click_action_statistics')
action['domain'] = [('link_id', '=', self.id)]
action['context'] = dict(self._context, create=False)
return action
def action_visit_page(self):
return {
'name': _("Visit Webpage"),
'type': 'ir.actions.act_url',
'url': self.url,
'target': 'new',
}
@api.model
def recent_links(self, filter, limit):
if filter == 'newest':
return self.search_read([], order='create_date DESC, id DESC', limit=limit)
elif filter == 'most-clicked':
return self.search_read([('count', '!=', 0)], order='count DESC', limit=limit)
elif filter == 'recently-used':
return self.search_read([('count', '!=', 0)], order='write_date DESC, id DESC', limit=limit)
else:
return {'Error': "This filter doesn't exist."}
@api.model
def get_url_from_code(self, code):
code_rec = self.env['link.tracker.code'].sudo().search([('code', '=', code)])
if not code_rec:
return None
return code_rec.link_id.redirected_url
class LinkTrackerCode(models.Model):
_name = "link.tracker.code"
_description = "Link Tracker Code"
_rec_name = 'code'
code = fields.Char(string='Short URL Code', required=True, store=True)
link_id = fields.Many2one('link.tracker', 'Link', required=True, ondelete='cascade')
_sql_constraints = [
('code', 'unique( code )', 'Code must be unique.')
]
@api.model
def _get_random_code_strings(self, n=1):
size = 3
while True:
code_propositions = [
''.join(random.choices(string.ascii_letters + string.digits, k=size))
for __ in range(n)
]
if len(set(code_propositions)) != n or self.search([('code', 'in', code_propositions)]):
size += 1
else:
return code_propositions
class LinkTrackerClick(models.Model):
_name = "link.tracker.click"
_rec_name = "link_id"
_description = "Link Tracker Click"
campaign_id = fields.Many2one(
'utm.campaign', 'UTM Campaign',
related="link_id.campaign_id", store=True, ondelete="set null")
link_id = fields.Many2one(
'link.tracker', 'Link',
index=True, required=True, ondelete='cascade')
ip = fields.Char(string='Internet Protocol')
country_id = fields.Many2one('res.country', 'Country')
def _prepare_click_values_from_route(self, **route_values):
click_values = dict((fname, route_values[fname]) for fname in self._fields if fname in route_values)
if not click_values.get('country_id') and route_values.get('country_code'):
click_values['country_id'] = self.env['res.country'].search([('code', '=', route_values['country_code'])], limit=1).id
return click_values
@api.model
def add_click(self, code, **route_values):
""" Main API to add a click on a link. """
tracker_code = self.env['link.tracker.code'].search([('code', '=', code)])
if not tracker_code:
return None
route_values['link_id'] = tracker_code.link_id.id
click_values = self._prepare_click_values_from_route(**route_values)
return self.create(click_values)