# -*- coding: utf-8 -*- # Part of Odoo. See LICENSE file for full copyright and licensing details. import base64 import json import math import re from werkzeug import urls from odoo import http, tools, _, SUPERUSER_ID from odoo.exceptions import AccessDenied, AccessError, MissingError, UserError, ValidationError from odoo.http import content_disposition, Controller, request, route from odoo.tools import consteq # -------------------------------------------------- # Misc tools # -------------------------------------------------- def pager(url, total, page=1, step=30, scope=5, url_args=None): """ Generate a dict with required value to render `website.pager` template. This method computes url, page range to display, ... in the pager. :param str url : base url of the page link :param int total : number total of item to be splitted into pages :param int page : current page :param int step : item per page :param int scope : number of page to display on pager :param dict url_args : additionnal parameters to add as query params to page url :returns dict """ # Compute Pager page_count = int(math.ceil(float(total) / step)) page = max(1, min(int(page if str(page).isdigit() else 1), page_count)) scope -= 1 pmin = max(page - int(math.floor(scope/2)), 1) pmax = min(pmin + scope, page_count) if pmax - pmin < scope: pmin = pmax - scope if pmax - scope > 0 else 1 def get_url(page): _url = "%s/page/%s" % (url, page) if page > 1 else url if url_args: _url = "%s?%s" % (_url, urls.url_encode(url_args)) return _url return { "page_count": page_count, "offset": (page - 1) * step, "page": { 'url': get_url(page), 'num': page }, "page_first": { 'url': get_url(1), 'num': 1 }, "page_start": { 'url': get_url(pmin), 'num': pmin }, "page_previous": { 'url': get_url(max(pmin, page - 1)), 'num': max(pmin, page - 1) }, "page_next": { 'url': get_url(min(pmax, page + 1)), 'num': min(pmax, page + 1) }, "page_end": { 'url': get_url(pmax), 'num': pmax }, "page_last": { 'url': get_url(page_count), 'num': page_count }, "pages": [ {'url': get_url(page_num), 'num': page_num} for page_num in range(pmin, pmax+1) ] } def get_records_pager(ids, current): if current.id in ids and (hasattr(current, 'website_url') or hasattr(current, 'access_url')): attr_name = 'access_url' if hasattr(current, 'access_url') else 'website_url' idx = ids.index(current.id) prev_record = idx != 0 and current.browse(ids[idx - 1]) next_record = idx < len(ids) - 1 and current.browse(ids[idx + 1]) if prev_record and prev_record[attr_name] and attr_name == "access_url": prev_url = '%s?access_token=%s' % (prev_record[attr_name], prev_record._portal_ensure_token()) elif prev_record and prev_record[attr_name]: prev_url = prev_record[attr_name] else: prev_url = prev_record if next_record and next_record[attr_name] and attr_name == "access_url": next_url = '%s?access_token=%s' % (next_record[attr_name], next_record._portal_ensure_token()) elif next_record and next_record[attr_name]: next_url = next_record[attr_name] else: next_url = next_record return { 'prev_record': prev_url, 'next_record': next_url, } return {} def _build_url_w_params(url_string, query_params, remove_duplicates=True): """ Rebuild a string url based on url_string and correctly compute query parameters using those present in the url and those given by query_params. Having duplicates in the final url is optional. For example: * url_string = '/my?foo=bar&error=pay' * query_params = {'foo': 'bar2', 'alice': 'bob'} * if remove duplicates: result = '/my?foo=bar2&error=pay&alice=bob' * else: result = '/my?foo=bar&foo=bar2&error=pay&alice=bob' """ url = urls.url_parse(url_string) url_params = url.decode_query() if remove_duplicates: # convert to standard dict instead of werkzeug multidict to remove duplicates automatically url_params = url_params.to_dict() url_params.update(query_params) return url.replace(query=urls.url_encode(url_params)).to_url() class CustomerPortal(Controller): MANDATORY_BILLING_FIELDS = ["name", "phone", "email", "street", "city", "country_id"] OPTIONAL_BILLING_FIELDS = ["zipcode", "state_id", "vat", "company_name"] _items_per_page = 80 def _prepare_portal_layout_values(self): """Values for /my/* templates rendering. Does not include the record counts. """ # get customer sales rep sales_user_sudo = request.env['res.users'] partner_sudo = request.env.user.partner_id if partner_sudo.user_id and not partner_sudo.user_id._is_public(): sales_user_sudo = partner_sudo.user_id else: fallback_sales_user = partner_sudo.commercial_partner_id.user_id if fallback_sales_user and not fallback_sales_user._is_public(): sales_user_sudo = fallback_sales_user return { 'sales_user': sales_user_sudo, 'page_name': 'home', } def _prepare_home_portal_values(self, counters): """Values for /my & /my/home routes template rendering. Includes the record count for the displayed badges. where 'counters' is the list of the displayed badges and so the list to compute. """ return {} @route(['/my/counters'], type='json', auth="user", website=True) def counters(self, counters, **kw): return self._prepare_home_portal_values(counters) @route(['/my', '/my/home'], type='http', auth="user", website=True) def home(self, **kw): values = self._prepare_portal_layout_values() return request.render("portal.portal_my_home", values) @route(['/my/account'], type='http', auth='user', website=True) def account(self, redirect=None, **post): values = self._prepare_portal_layout_values() partner = request.env.user.partner_id values.update({ 'error': {}, 'error_message': [], }) if post and request.httprequest.method == 'POST': if not partner.can_edit_vat(): post['country_id'] = str(partner.country_id.id) error, error_message = self.details_form_validate(post) values.update({'error': error, 'error_message': error_message}) values.update(post) if not error: values = {key: post[key] for key in self._get_mandatory_fields()} values.update({key: post[key] for key in self._get_optional_fields() if key in post}) for field in set(['country_id', 'state_id']) & set(values.keys()): try: values[field] = int(values[field]) except: values[field] = False values.update({'zip': values.pop('zipcode', '')}) self.on_account_update(values, partner) partner.sudo().write(values) if redirect: return request.redirect(redirect) return request.redirect('/my/home') countries = request.env['res.country'].sudo().search([]) states = request.env['res.country.state'].sudo().search([]) values.update({ 'partner': partner, 'countries': countries, 'states': states, 'has_check_vat': hasattr(request.env['res.partner'], 'check_vat'), 'partner_can_edit_vat': partner.can_edit_vat(), 'redirect': redirect, 'page_name': 'my_details', }) response = request.render("portal.portal_my_details", values) response.headers['X-Frame-Options'] = 'SAMEORIGIN' response.headers['Content-Security-Policy'] = "frame-ancestors 'self'" return response def on_account_update(self, values, partner): pass @route('/my/security', type='http', auth='user', website=True, methods=['GET', 'POST']) def security(self, **post): values = self._prepare_portal_layout_values() values['get_error'] = get_error values['allow_api_keys'] = bool(request.env['ir.config_parameter'].sudo().get_param('portal.allow_api_keys')) values['open_deactivate_modal'] = False if request.httprequest.method == 'POST': values.update(self._update_password( post['old'].strip(), post['new1'].strip(), post['new2'].strip() )) return request.render('portal.portal_my_security', values, headers={ 'X-Frame-Options': 'SAMEORIGIN', 'Content-Security-Policy': "frame-ancestors 'self'" }) def _update_password(self, old, new1, new2): for k, v in [('old', old), ('new1', new1), ('new2', new2)]: if not v: return {'errors': {'password': {k: _("You cannot leave any password empty.")}}} if new1 != new2: return {'errors': {'password': {'new2': _("The new password and its confirmation must be identical.")}}} try: request.env['res.users'].change_password(old, new1) except AccessDenied as e: msg = e.args[0] if msg == AccessDenied().args[0]: msg = _('The old password you provided is incorrect, your password was not changed.') return {'errors': {'password': {'old': msg}}} except UserError as e: return {'errors': {'password': str(e)}} # update session token so the user does not get logged out (cache cleared by passwd change) new_token = request.env.user._compute_session_token(request.session.sid) request.session.session_token = new_token return {'success': {'password': True}} @route('/my/deactivate_account', type='http', auth='user', website=True, methods=['POST']) def deactivate_account(self, validation, password, **post): values = self._prepare_portal_layout_values() values['get_error'] = get_error values['open_deactivate_modal'] = True if validation != request.env.user.login: values['errors'] = {'deactivate': 'validation'} else: try: request.env['res.users']._check_credentials(password, {'interactive': True}) request.env.user.sudo()._deactivate_portal_user(**post) request.session.logout() return request.redirect('/web/login?message=%s' % urls.url_quote(_('Account deleted!'))) except AccessDenied: values['errors'] = {'deactivate': 'password'} except UserError as e: values['errors'] = {'deactivate': {'other': str(e)}} return request.render('portal.portal_my_security', values, headers={ 'X-Frame-Options': 'SAMEORIGIN', 'Content-Security-Policy': "frame-ancestors 'self'", }) @http.route('/portal/attachment/add', type='http', auth='public', methods=['POST'], website=True) def attachment_add(self, name, file, res_model, res_id, access_token=None, **kwargs): """Process a file uploaded from the portal chatter and create the corresponding `ir.attachment`. The attachment will be created "pending" until the associated message is actually created, and it will be garbage collected otherwise. :param name: name of the file to save. :type name: string :param file: the file to save :type file: werkzeug.FileStorage :param res_model: name of the model of the original document. To check access rights only, it will not be saved here. :type res_model: string :param res_id: id of the original document. To check access rights only, it will not be saved here. :type res_id: int :param access_token: access_token of the original document. To check access rights only, it will not be saved here. :type access_token: string :return: attachment data {id, name, mimetype, file_size, access_token} :rtype: dict """ try: self._document_check_access(res_model, int(res_id), access_token=access_token) except (AccessError, MissingError) as e: raise UserError(_("The document does not exist or you do not have the rights to access it.")) IrAttachment = request.env['ir.attachment'] # Avoid using sudo when not necessary: internal users can create attachments, # as opposed to public and portal users. if not request.env.user._is_internal(): IrAttachment = IrAttachment.sudo() # At this point the related message does not exist yet, so we assign # those specific res_model and res_is. They will be correctly set # when the message is created: see `portal_chatter_post`, # or garbage collected otherwise: see `_garbage_collect_attachments`. attachment = IrAttachment.create({ 'name': name, 'datas': base64.b64encode(file.read()), 'res_model': 'mail.compose.message', 'res_id': 0, 'access_token': IrAttachment._generate_access_token(), }) return request.make_response( data=json.dumps(attachment.read(['id', 'name', 'mimetype', 'file_size', 'access_token'])[0]), headers=[('Content-Type', 'application/json')] ) @http.route('/portal/attachment/remove', type='json', auth='public') def attachment_remove(self, attachment_id, access_token=None): """Remove the given `attachment_id`, only if it is in a "pending" state. The user must have access right on the attachment or provide a valid `access_token`. """ try: attachment_sudo = self._document_check_access('ir.attachment', int(attachment_id), access_token=access_token) except (AccessError, MissingError) as e: raise UserError(_("The attachment does not exist or you do not have the rights to access it.")) if attachment_sudo.res_model != 'mail.compose.message' or attachment_sudo.res_id != 0: raise UserError(_("The attachment %s cannot be removed because it is not in a pending state.", attachment_sudo.name)) if attachment_sudo.env['mail.message'].search([('attachment_ids', 'in', attachment_sudo.ids)]): raise UserError(_("The attachment %s cannot be removed because it is linked to a message.", attachment_sudo.name)) return attachment_sudo.unlink() def details_form_validate(self, data, partner_creation=False): error = dict() error_message = [] # Validation for field_name in self._get_mandatory_fields(): if not data.get(field_name): error[field_name] = 'missing' # email validation if data.get('email') and not tools.single_email_re.match(data.get('email')): error["email"] = 'error' error_message.append(_('Invalid Email! Please enter a valid email address.')) # vat validation partner = request.env.user.partner_id if data.get("vat") and partner and partner.vat != data.get("vat"): # Check the VAT if it is the public user too. if partner_creation or partner.can_edit_vat(): if hasattr(partner, "check_vat"): if data.get("country_id"): data["vat"] = request.env["res.partner"].fix_eu_vat_number(int(data.get("country_id")), data.get("vat")) partner_dummy = partner.new({ 'vat': data['vat'], 'country_id': (int(data['country_id']) if data.get('country_id') else False), }) try: partner_dummy.check_vat() except ValidationError as e: error["vat"] = 'error' error_message.append(e.args[0]) else: error_message.append(_('Changing VAT number is not allowed once document(s) have been issued for your account. Please contact us directly for this operation.')) # error message for empty required fields if [err for err in error.values() if err == 'missing']: error_message.append(_('Some required fields are empty.')) unknown = [k for k in data if k not in self._get_mandatory_fields() + self._get_optional_fields()] if unknown: error['common'] = 'Unknown field' error_message.append("Unknown field '%s'" % ','.join(unknown)) return error, error_message def _get_mandatory_fields(self): """ This method is there so that we can override the mandatory fields """ return self.MANDATORY_BILLING_FIELDS def _get_optional_fields(self): """ This method is there so that we can override the optional fields """ return self.OPTIONAL_BILLING_FIELDS def _document_check_access(self, model_name, document_id, access_token=None): """Check if current user is allowed to access the specified record. :param str model_name: model of the requested record :param int document_id: id of the requested record :param str access_token: record token to check if user isn't allowed to read requested record :return: expected record, SUDOED, with SUPERUSER context :raise MissingError: record not found in database, might have been deleted :raise AccessError: current user isn't allowed to read requested document (and no valid token was given) """ document = request.env[model_name].browse([document_id]) document_sudo = document.with_user(SUPERUSER_ID).exists() if not document_sudo: raise MissingError(_("This document does not exist.")) try: document.check_access_rights('read') document.check_access_rule('read') except AccessError: if not access_token or not document_sudo.access_token or not consteq(document_sudo.access_token, access_token): raise return document_sudo def _get_page_view_values(self, document, access_token, values, session_history, no_breadcrumbs, **kwargs): """Include necessary values for portal chatter & pager setup (see template portal.message_thread). :param document: record to display on portal :param str access_token: provided document access token :param dict values: base dict of values where chatter rendering values should be added :param str session_history: key used to store latest records browsed on the portal in the session :param bool no_breadcrumbs: :return: updated values :rtype: dict """ values['object'] = document if access_token: # if no_breadcrumbs = False -> force breadcrumbs even if access_token to `invite` users to register if they click on it values['no_breadcrumbs'] = no_breadcrumbs values['access_token'] = access_token values['token'] = access_token # for portal chatter # Those are used notably whenever the payment form is implied in the portal. if kwargs.get('error'): values['error'] = kwargs['error'] if kwargs.get('warning'): values['warning'] = kwargs['warning'] if kwargs.get('success'): values['success'] = kwargs['success'] # Email token for posting messages in portal view with identified author if kwargs.get('pid'): values['pid'] = kwargs['pid'] if kwargs.get('hash'): values['hash'] = kwargs['hash'] history = request.session.get(session_history, []) values.update(get_records_pager(history, document)) return values def _show_report(self, model, report_type, report_ref, download=False): if report_type not in ('html', 'pdf', 'text'): raise UserError(_("Invalid report type: %s", report_type)) ReportAction = request.env['ir.actions.report'].sudo() if hasattr(model, 'company_id'): if len(model.company_id) > 1: raise UserError(_('Multi company reports are not supported.')) ReportAction = ReportAction.with_company(model.company_id) method_name = '_render_qweb_%s' % (report_type) report = getattr(ReportAction, method_name)(report_ref, list(model.ids), data={'report_type': report_type})[0] headers = self._get_http_headers(model, report_type, report, download) return request.make_response(report, headers=list(headers.items())) def _get_http_headers(self, model, report_type, report, download): headers = { 'Content-Type': 'application/pdf' if report_type == 'pdf' else 'text/html', 'Content-Length': len(report), } if report_type == 'pdf' and download: filename = "%s.pdf" % (re.sub('\W+', '-', model._get_report_base_filename())) headers['Content-Disposition'] = content_disposition(filename) return headers def get_error(e, path=''): """ Recursively dereferences `path` (a period-separated sequence of dict keys) in `e` (an error dict or value), returns the final resolution IIF it's an str, otherwise returns None """ for k in (path.split('.') if path else []): if not isinstance(e, dict): return None e = e.get(k) return e if isinstance(e, str) else None