# -*- coding: utf-8 -*- # Part of Odoo. See LICENSE file for full copyright and licensing details. import logging import mimetypes import requests import werkzeug.utils from werkzeug.urls import url_encode from odoo import http, tools, _ from odoo.http import request from odoo.tools.mimetypes import guess_mimetype from odoo.addons.web_editor.controllers.main import Web_Editor logger = logging.getLogger(__name__) class Web_Unsplash(http.Controller): def _get_access_key(self): """ Use this method to get the key, needed for internal reason """ return request.env['ir.config_parameter'].sudo().get_param('unsplash.access_key') def _notify_download(self, url): ''' Notifies Unsplash from an image download. (API requirement) :param url: the download_url of the image to be notified This method won't return anything. This endpoint should just be pinged with a simple GET request for Unsplash to increment the image view counter. ''' try: if not url.startswith('https://api.unsplash.com/photos/') and not request.env.registry.in_test_mode(): raise Exception(_("ERROR: Unknown Unsplash notify URL!")) access_key = self._get_access_key() requests.get(url, params=url_encode({'client_id': access_key})) except Exception as e: logger.exception("Unsplash download notification failed: " + str(e)) # ------------------------------------------------------ # add unsplash image url # ------------------------------------------------------ @http.route('/web_unsplash/attachment/add', type='json', auth='user', methods=['POST']) def save_unsplash_url(self, unsplashurls=None, **kwargs): """ unsplashurls = { image_id1: { url: image_url, download_url: download_url, }, image_id2: { url: image_url, download_url: download_url, }, ..... } """ def slugify(s): ''' Keeps only alphanumeric characters, hyphens and spaces from a string. The string will also be truncated to 1024 characters max. :param s: the string to be filtered :return: the sanitized string ''' return "".join([c for c in s if c.isalnum() or c in list("- ")])[:1024] if not unsplashurls: return [] uploads = [] query = kwargs.get('query', '') query = slugify(query) res_model = kwargs.get('res_model', 'ir.ui.view') if res_model != 'ir.ui.view' and kwargs.get('res_id'): res_id = int(kwargs['res_id']) else: res_id = None for key, value in unsplashurls.items(): url = value.get('url') try: if not url.startswith(('https://images.unsplash.com/', 'https://plus.unsplash.com/')) and not request.env.registry.in_test_mode(): logger.exception("ERROR: Unknown Unsplash URL!: " + url) raise Exception(_("ERROR: Unknown Unsplash URL!")) req = requests.get(url) if req.status_code != requests.codes.ok: continue # get mime-type of image url because unsplash url dosn't contains mime-types in url image = req.content except requests.exceptions.ConnectionError as e: logger.exception("Connection Error: " + str(e)) continue except requests.exceptions.Timeout as e: logger.exception("Timeout: " + str(e)) continue image = tools.image_process(image, verify_resolution=True) mimetype = guess_mimetype(image) # append image extension in name query += mimetypes.guess_extension(mimetype) or '' # /unsplash/5gR788gfd/lion url_frags = ['unsplash', key, query] attachment_data = { 'name': '_'.join(url_frags), 'url': '/' + '/'.join(url_frags), 'data': image, 'res_id': res_id, 'res_model': res_model, } attachment = Web_Editor._attachment_create(self, **attachment_data) if value.get('description'): attachment.description = value.get('description') attachment.generate_access_token() uploads.append(attachment._get_media_info()) # Notifies Unsplash from an image download. (API requirement) self._notify_download(value.get('download_url')) return uploads @http.route("/web_unsplash/fetch_images", type='json', auth="user") def fetch_unsplash_images(self, **post): access_key = self._get_access_key() app_id = self.get_unsplash_app_id() if not access_key or not app_id: if not request.env.user._can_manage_unsplash_settings(): return {'error': 'no_access'} return {'error': 'key_not_found'} post['client_id'] = access_key response = requests.get('https://api.unsplash.com/search/photos/', params=url_encode(post)) if response.status_code == requests.codes.ok: return response.json() else: if not request.env.user._can_manage_unsplash_settings(): return {'error': 'no_access'} return {'error': response.status_code} @http.route("/web_unsplash/get_app_id", type='json', auth="public") def get_unsplash_app_id(self, **post): return request.env['ir.config_parameter'].sudo().get_param('unsplash.app_id') @http.route("/web_unsplash/save_unsplash", type='json', auth="user") def save_unsplash(self, **post): if request.env.user._can_manage_unsplash_settings(): request.env['ir.config_parameter'].sudo().set_param('unsplash.app_id', post.get('appId')) request.env['ir.config_parameter'].sudo().set_param('unsplash.access_key', post.get('key')) return True raise werkzeug.exceptions.NotFound()