# -*- coding: utf-8 -*- # Part of Odoo. See LICENSE file for full copyright and licensing details. from datetime import datetime from dateutil.relativedelta import relativedelta from freezegun import freeze_time from unittest.mock import patch from odoo import exceptions, fields, _ from odoo.addons.mail.tests.common import mail_new_test_user from odoo.tests import common class TestKarmaTrackingCommon(common.TransactionCase): @classmethod def setUpClass(cls): super(TestKarmaTrackingCommon, cls).setUpClass() cls.test_user = mail_new_test_user( cls.env, login='test', name='Test User', email='test@example.com', karma=0, groups='base.group_user', ) cls.test_user_2 = mail_new_test_user( cls.env, login='test2', name='Test User 2', email='test2@example.com', karma=0, groups='base.group_user', ) cls.env['gamification.karma.tracking'].search([]).unlink() cls.test_date = datetime(2021, 6, 1) cls.first_day_of_test_date_month = '2021-06-01' cls.first_day_of_test_date_next_month = '2021-07-01' @classmethod def _create_trackings(cls, user, karma, steps, track_date, days_delta=1): old_value = user.karma for step in range(steps): new_value = old_value + karma cls.env['gamification.karma.tracking'].create([{ 'user_id': user.id, 'old_value': old_value, 'new_value': new_value, 'consolidated': False, 'tracking_date': fields.Datetime.to_string(track_date) }]) old_value = new_value track_date = track_date + relativedelta(days=days_delta) def test_computation_gain(self): self._create_trackings(self.test_user, 20, 2, self.test_date, days_delta=30) self._create_trackings(self.test_user_2, 10, 20, self.test_date, days_delta=2) results = (self.test_user | self.test_user_2)._get_tracking_karma_gain_position([]) self.assertEqual(results[0]['user_id'], self.test_user_2.id) self.assertEqual(results[0]['karma_gain_total'], 200) self.assertEqual(results[0]['karma_position'], 1) self.assertEqual(results[1]['user_id'], self.test_user.id) self.assertEqual(results[1]['karma_gain_total'], 40) self.assertEqual(results[1]['karma_position'], 2) results = (self.test_user | self.test_user_2)._get_tracking_karma_gain_position([], to_date=self.test_date + relativedelta(day=2)) self.assertEqual(results[0]['user_id'], self.test_user.id) self.assertEqual(results[0]['karma_gain_total'], 20) self.assertEqual(results[0]['karma_position'], 1) self.assertEqual(results[1]['user_id'], self.test_user_2.id) self.assertEqual(results[1]['karma_gain_total'], 10) self.assertEqual(results[1]['karma_position'], 2) results = (self.test_user | self.test_user_2)._get_tracking_karma_gain_position([], from_date=self.test_date + relativedelta(months=1, day=1)) self.assertEqual(results[0]['user_id'], self.test_user_2.id) self.assertEqual(results[0]['karma_gain_total'], 50) self.assertEqual(results[0]['karma_position'], 1) self.assertEqual(results[1]['user_id'], self.test_user.id) self.assertEqual(results[1]['karma_gain_total'], 20) self.assertEqual(results[1]['karma_position'], 2) results = self.env['res.users']._get_tracking_karma_gain_position([]) self.assertEqual(len(results), 0) @freeze_time('2021-02-02') def test_consolidation_cron(self): Tracking = self.env['gamification.karma.tracking'] # Sanity check self.assertFalse(Tracking.search_count([('user_id', 'in', (self.test_user | self.test_user_2).ids)])) test_date = datetime(2020, 12, 15) first_day_of_test_date_month = '2020-12-01' first_day_of_test_date_next_month = '2021-01-01' self._create_trackings(self.test_user, karma=20, steps=2, track_date=test_date, days_delta=30) self._create_trackings(self.test_user_2, karma=10, steps=20, track_date=test_date, days_delta=2) # Sanity check self.assertEqual(Tracking.search_count([('user_id', '=', self.test_user.id)]), 2) self.assertEqual(Tracking.search_count([('user_id', '=', self.test_user_2.id)]), 20) self.assertEqual(self.test_user.karma, 40) self.assertEqual(self.test_user_2.karma, 200) with self.assertQueryCount(8), patch.object(type(self.env['res.users']), 'write') as patched_user_write: Tracking._consolidate_cron() # consolidation should not change user karma self.assertFalse(patched_user_write.called, "User karma didn't change during consolidation, it should not be updated") self.assertEqual(self.test_user.karma, 40) self.assertEqual(self.test_user_2.karma, 200) consolidated_1 = Tracking.search([ ('user_id', '=', self.test_user.id), ('tracking_date', '>=', first_day_of_test_date_month), ('tracking_date', '<', first_day_of_test_date_next_month), ]) self.assertEqual(len(consolidated_1), 1) self.assertTrue(consolidated_1.consolidated) self.assertEqual(consolidated_1.old_value, 0) self.assertEqual(consolidated_1.new_value, 20) self.assertEqual(consolidated_1.reason, 'Consolidation from 2020-12-01 to 2020-12-31') consolidated_2 = Tracking.search([ ('user_id', '=', self.test_user_2.id), ('tracking_date', '>=', first_day_of_test_date_month), ('tracking_date', '<', first_day_of_test_date_next_month), ]) self.assertEqual(len(consolidated_2), 1) self.assertTrue(consolidated_2.consolidated) self.assertEqual(consolidated_2.old_value, 0) self.assertEqual(consolidated_2.new_value, 10 * 9) # 9 records have been consolidated self.assertEqual(consolidated_2.reason, 'Consolidation from 2020-12-01 to 2020-12-31') unconsolidated_1 = Tracking.search_count([ ('user_id', '=', self.test_user.id), ('consolidated', '=', False), ]) self.assertEqual(unconsolidated_1, 1) unconsolidated_2 = Tracking.search_count([ ('user_id', '=', self.test_user_2.id), ('consolidated', '=', False), ]) self.assertEqual(unconsolidated_2, 11) def test_consolidation_monthly(self): Tracking = self.env['gamification.karma.tracking'] base_test_user_karma = self.test_user.karma base_test_user_2_karma = self.test_user_2.karma self._create_trackings(self.test_user, 20, 2, self.test_date, days_delta=30) self._create_trackings(self.test_user_2, 10, 20, self.test_date, days_delta=2) Tracking._process_consolidate(self.test_date) consolidated = Tracking.search([ ('user_id', '=', self.test_user_2.id), ('tracking_date', '>=', self.first_day_of_test_date_month), ('tracking_date', '<', self.first_day_of_test_date_next_month), ]) self.assertEqual(len(consolidated), 1) self.assertTrue(consolidated.consolidated) self.assertEqual(consolidated.old_value, base_test_user_2_karma) self.assertEqual(consolidated.new_value, base_test_user_2_karma + 150) # 15 2-days span, from 1 to 29 included = 15 steps -> 150 karma remaining = Tracking.search([ ('user_id', '=', self.test_user_2.id), ('consolidated', '=', False) ]) self.assertEqual(len(remaining), 5) # 15 steps consolidated, remaining 5 self.assertEqual(remaining[0].tracking_date, self.test_date + relativedelta(months=1, day=9)) # ordering: last first self.assertEqual(remaining[-1].tracking_date, self.test_date + relativedelta(months=1, day=1)) Tracking._process_consolidate(self.test_date + relativedelta(months=1)) consolidated = Tracking.search([ ('user_id', '=', self.test_user_2.id), ('consolidated', '=', True), ]) self.assertEqual(len(consolidated), 2) self.assertEqual(consolidated[0].new_value, base_test_user_2_karma + 200) # 5 remaining 2-days span, from 1 to 9 included = 5 steps -> 50 karma self.assertEqual(consolidated[0].old_value, base_test_user_2_karma + 150) # coming from previous iteration self.assertEqual(consolidated[0].tracking_date.date(), self.test_date.date() + relativedelta(months=1)) # tracking set at beginning of month self.assertEqual(consolidated[-1].new_value, base_test_user_2_karma + 150) # previously created one still present self.assertEqual(consolidated[-1].old_value, base_test_user_2_karma) # previously created one still present remaining = Tracking.search([ ('user_id', '=', self.test_user_2.id), ('consolidated', '=', False) ]) self.assertFalse(remaining) # current user not-in-details tests current_user_trackings = Tracking.search([ ('user_id', '=', self.test_user.id), ]) self.assertEqual(len(current_user_trackings), 2) self.assertEqual(current_user_trackings[0].new_value, base_test_user_karma + 40) self.assertEqual(current_user_trackings[-1].old_value, base_test_user_karma) def test_user_as_erp_manager(self): self.test_user.write({'groups_id': [ (4, self.env.ref('base.group_partner_manager').id), (4, self.env.ref('base.group_erp_manager').id) ]}) user = self.env['res.users'].with_user(self.test_user).create({ 'name': 'Test Ostérone', 'karma': '32', 'login': 'dummy', 'email': 'dummy@example.com', }) with self.assertRaises(exceptions.AccessError): user.read(['karma_tracking_ids']) user._add_karma(38, source=self.test_user_2) self.assertEqual(user.karma, 70) trackings = self.env['gamification.karma.tracking'].sudo().search( [('user_id', '=', user.id)], order="create_date ASC, id ASC") self.assertEqual(len(trackings), 2) # create + add_karma self.assertEqual(trackings[0].origin_ref, self.test_user) self.assertEqual(trackings[0].reason, "User Creation (Test User #%i)" % self.test_user.id) self.assertEqual(trackings[1].origin_ref, self.test_user_2) self.assertIn("Add Manually", trackings[1].reason) self.assertIn(self.test_user_2.display_name, trackings[1].reason) self.assertIn(str(self.test_user_2.id), trackings[1].reason) def test_user_tracking(self): self.test_user.write({'groups_id': [ (4, self.env.ref('base.group_partner_manager').id), (4, self.env.ref('base.group_system').id) ]}) user = self.env['res.users'].with_user(self.test_user).create({ 'name': 'Test Ostérone', 'karma': '32', 'login': 'dummy', 'email': 'dummy@example.com', }) self.assertEqual(user.karma, 32) self.assertEqual(len(user.karma_tracking_ids), 1) self.assertEqual(user.karma_tracking_ids.old_value, 0) self.assertEqual(user.karma_tracking_ids.new_value, 32) user._add_karma(38) self.assertEqual(user.karma, 70) self.assertEqual(len(user.karma_tracking_ids), 2) self.assertEqual(user.karma_tracking_ids[1].old_value, 32) self.assertEqual(user.karma_tracking_ids[1].new_value, 70) self.assertIn(_('Add Manually'), user.karma_tracking_ids[1].reason) self.assertIn(self.test_user.display_name, user.karma_tracking_ids[1].reason) self.assertIn(str(self.test_user.id), user.karma_tracking_ids[1].reason) self.assertEqual(user.karma_tracking_ids[0].old_value, 0) self.assertEqual(user.karma_tracking_ids[0].new_value, 32) user._add_karma(69, user, _('Test Reason')) self.assertEqual(len(user.karma_tracking_ids), 3) self.assertIn(_('Test Reason'), user.karma_tracking_ids[2].reason) self.assertEqual(user.karma, 139) # add manually karma to a user (e.g. from the technical view) tracking = self.env['gamification.karma.tracking'].create({ 'user_id': user.id, 'new_value': 150, 'consolidated': False, }) self.assertEqual(tracking.old_value, 139) self.assertEqual(tracking.gain, 11) self.assertEqual(user.karma, 150) # write directly on the karma field, should generate self.test_user_2.karma = 100 # won't change last_tracking_3 = self.test_user_2.karma_tracking_ids[-1] users = (user | self.test_user | self.test_user_2).with_user(self.test_user) with self.assertQueryCount(12): users.karma = 100 tracking_1 = user.karma_tracking_ids[-1] tracking_2 = self.test_user.karma_tracking_ids[-1] tracking_3 = self.test_user_2.karma_tracking_ids[-1] self.assertEqual(user.karma, 100) self.assertEqual(self.test_user.karma, 100) self.assertEqual(tracking_1.new_value, 100) self.assertEqual(tracking_1.old_value, 150) self.assertEqual(tracking_1.gain, -50) self.assertEqual(tracking_1.reason, "Add Manually (Test User #%i)" % self.test_user.id) self.assertEqual(tracking_1.origin_ref, self.test_user) self.assertEqual(tracking_2.new_value, 100) self.assertEqual(tracking_2.old_value, 0) self.assertEqual(tracking_2.gain, 100) self.assertEqual(tracking_2.reason, "Add Manually (Test User #%i)" % self.test_user.id) self.assertEqual(tracking_2.origin_ref, self.test_user) self.assertEqual(last_tracking_3, tracking_3, "Shouldn't have created a new tracking for the third user") self.assertEqual(tracking_3.new_value, 100) self.assertEqual(tracking_3.old_value, 0) self.assertEqual(tracking_3.gain, 100) class TestComputeRankCommon(common.TransactionCase): @classmethod def setUpClass(cls): super(TestComputeRankCommon, cls).setUpClass() def _patched_send_mail(*args, **kwargs): pass patch_email = patch('odoo.addons.mail.models.mail_template.MailTemplate.send_mail', _patched_send_mail) cls.startClassPatcher(patch_email) cls.users = cls.env['res.users'] for k in range(-5, 1030, 30): cls.users += mail_new_test_user( cls.env, name=str(k), login="test_recompute_rank_%s" % k, karma=k, ) cls.env['gamification.karma.rank'].search([]).unlink() cls.rank_1 = cls.env['gamification.karma.rank'].create({ 'name': 'rank 1', 'karma_min': 1, }) cls.rank_2 = cls.env['gamification.karma.rank'].create({ 'name': 'rank 2', 'karma_min': 250, }) cls.rank_3 = cls.env['gamification.karma.rank'].create({ 'name': 'rank 3', 'karma_min': 500, }) cls.rank_4 = cls.env['gamification.karma.rank'].create({ 'name': 'rank 4', 'karma_min': 1000, }) def test_00_initial_compute(self): self.assertEqual(len(self.users), 35) self.assertEqual( len(self.rank_1.user_ids & self.users), len([u for u in self.users if u.karma >= self.rank_1.karma_min and u.karma < self.rank_2.karma_min]) ) self.assertEqual( len(self.rank_2.user_ids & self.users), len([u for u in self.users if u.karma >= self.rank_2.karma_min and u.karma < self.rank_3.karma_min]) ) self.assertEqual( len(self.rank_3.user_ids & self.users), len([u for u in self.users if u.karma >= self.rank_3.karma_min and u.karma < self.rank_4.karma_min]) ) self.assertEqual( len(self.rank_4.user_ids & self.users), len([u for u in self.users if u.karma >= self.rank_4.karma_min]) ) def test_01_switch_rank(self): self.assertEqual(len(self.users), 35) self.rank_3.karma_min = 100 # rank_1 -> rank_3 -> rank_2 -> rank_4 self.assertEqual( len(self.rank_1.user_ids & self.users), len([u for u in self.users if u.karma >= self.rank_1.karma_min and u.karma < self.rank_3.karma_min]) ) self.assertEqual( len(self.rank_3.user_ids & self.users), len([u for u in self.users if u.karma >= self.rank_3.karma_min and u.karma < self.rank_2.karma_min]) ) self.assertEqual( len(self.rank_2.user_ids & self.users), len([u for u in self.users if u.karma >= self.rank_2.karma_min and u.karma < self.rank_4.karma_min]) ) self.assertEqual( len(self.rank_4.user_ids & self.users), len([u for u in self.users if u.karma >= self.rank_4.karma_min]) ) def test_02_update_rank_without_switch(self): number_of_users = False def _patched_recompute_rank(_self, *args, **kwargs): nonlocal number_of_users number_of_users = len(_self & self.users) patch_bulk = patch('odoo.addons.gamification.models.res_users.Users._recompute_rank', _patched_recompute_rank) self.startPatcher(patch_bulk) self.rank_3.karma_min = 700 self.assertEqual(number_of_users, 7, "Should just recompute for the 7 users between 500 and 700") def test_03_test_bulk_call(self): self.assertEqual(len(self.users), 35) def _patched_check_in_bulk(*args, **kwargs): raise patch_bulk = patch('odoo.addons.gamification.models.res_users.Users._recompute_rank_bulk', _patched_check_in_bulk) self.startPatcher(patch_bulk) # call on 5 users should not trigger the bulk function self.users[0:5]._recompute_rank() # call on 50 users should trigger the bulk function with self.assertRaises(Exception): self.users[0:50]._recompute_rank()