# -*- coding: utf-8 -*- import logging from datetime import datetime from datetime import timedelta from pytz import timezone, utc from datetime import datetime, time from collections import defaultdict from dateutil.relativedelta import relativedelta import pytz from odoo import models, fields from odoo.addons.resource.models.utils import HOURS_PER_DAY _logger = logging.getLogger(__name__) class Employee(models.Model): _inherit = "hr.employee" def _get_consumed_leaves(self, leave_types, target_date=False, ignore_future=False): employees = self or self._get_contextual_employee() leaves_domain = [ ("holiday_status_id", "in", leave_types.ids), ("employee_id", "in", employees.ids), ("state", "in", ["confirm", "validate1", "validate"]), ] if self.env.context.get("ignored_leave_ids"): leaves_domain.append( ("id", "not in", self.env.context.get("ignored_leave_ids")) ) if not target_date: target_date = fields.Date.today() if ignore_future: leaves_domain.append(("date_from", "<=", target_date)) leaves = self.env["hr.leave"].search(leaves_domain) leaves_per_employee_type = defaultdict( lambda: defaultdict(lambda: self.env["hr.leave"]) ) for leave in leaves: leaves_per_employee_type[leave.employee_id][ leave.holiday_status_id ] |= leave allocations = ( self.env["hr.leave.allocation"] .with_context(active_test=False) .search( [ ("employee_id", "in", employees.ids), ("holiday_status_id", "in", leave_types.ids), ("state", "=", "validate"), ] ) .filtered(lambda al: al.active or not al.employee_id.active) ) allocations_per_employee_type = defaultdict( lambda: defaultdict(lambda: self.env["hr.leave.allocation"]) ) for allocation in allocations: allocations_per_employee_type[allocation.employee_id][ allocation.holiday_status_id ] |= allocation # _get_consumed_leaves returns a tuple of two dictionnaries. # 1) The first is a dictionary to map the number of days/hours of leaves taken per allocation # The structure is the following: # - KEYS: # allocation_leaves_consumed # |--employee_id # |--holiday_status_id # |--allocation # |--virtual_leaves_taken # |--leaves_taken # |--virtual_remaining_leaves # |--remaining_leaves # |--max_leaves # |--accrual_bonus # - VALUES: # Integer representing the number of (virtual) remaining leaves, (virtual) leaves taken or max leaves # for each allocation. # leaves_taken and remaining_leaves only take into account validated leaves, while the "virtual" equivalent are # also based on leaves in "confirm" or "validate1" state. # Accrual bonus gives the amount of additional leaves that will have been granted at the given # target_date in comparison to today. # The unit is in hour or days depending on the leave type request unit # 2) The second is a dictionary mapping the remaining days per employee and per leave type that are either # not taken into account by the allocations, mainly because accruals don't take future leaves into account. # This is used to warn the user if the leaves they takes bring them above their available limit. # - KEYS: # allocation_leaves_consumed # |--employee_id # |--holiday_status_id # |--to_recheck_leaves # |--excess_days # |--exceeding_duration # - VALUES: # "to_recheck_leaves" stores every leave that is not yet taken into account by the "allocation_leaves_consumed" dictionary. # "excess_days" represents the excess amount that somehow isn't taken into account by the first dictionary. # "exceeding_duration" sum up the to_recheck_leaves duration and compares it to the maximum allocated for that time period. allocations_leaves_consumed = defaultdict( lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: 0))) ) to_recheck_leaves_per_leave_type = defaultdict( lambda: defaultdict( lambda: { "excess_days": defaultdict( lambda: { "amount": 0, "is_virtual": True, } ), "exceeding_duration": 0, "to_recheck_leaves": self.env["hr.leave"], } ) ) for allocation in allocations: allocation_data = allocations_leaves_consumed[allocation.employee_id][ allocation.holiday_status_id ][allocation] future_leaves = 0 if allocation.allocation_type == "accrual": future_leaves = allocation._get_future_leaves_on(target_date) max_leaves = ( allocation.number_of_hours_display if allocation.type_request_unit in ["hour"] else allocation.number_of_days_display ) max_leaves += future_leaves allocation_data.update( { "max_leaves": max_leaves, "accrual_bonus": future_leaves, "virtual_remaining_leaves": max_leaves, "remaining_leaves": max_leaves, "leaves_taken": 0, "virtual_leaves_taken": 0, } ) for employee in employees: for leave_type in leave_types: allocations_with_date_to = self.env["hr.leave.allocation"] allocations_without_date_to = self.env["hr.leave.allocation"] for leave_allocation in allocations_per_employee_type[employee][ leave_type ]: if leave_allocation.date_to: allocations_with_date_to |= leave_allocation else: allocations_without_date_to |= leave_allocation sorted_leave_allocations = ( allocations_with_date_to.sorted(key="date_to") + allocations_without_date_to ) if leave_type.request_unit in ["c_day", "day", "half_day"]: leave_duration_field = "number_of_days" leave_unit = "days" else: leave_duration_field = "number_of_hours_display" leave_unit = "hours" leave_type_data = allocations_leaves_consumed[employee][leave_type] for leave in leaves_per_employee_type[employee][leave_type].sorted( "date_from" ): leave_duration = leave[leave_duration_field] skip_excess = False if ( sorted_leave_allocations.filtered( lambda alloc: alloc.allocation_type == "accrual" ) and leave.date_from.date() > target_date ): to_recheck_leaves_per_leave_type[employee][leave_type][ "to_recheck_leaves" ] |= leave skip_excess = True continue if leave_type.requires_allocation == "yes": for allocation in sorted_leave_allocations: # We don't want to include future leaves linked to accruals into the total count of available leaves. # However, we'll need to check if those leaves take more than what will be accrued in total of those days # to give a warning if the total exceeds what will be accrued. if allocation.date_from > leave.date_to.date() or ( allocation.date_to and allocation.date_to < leave.date_from.date() ): continue interval_start = max( leave.date_from, datetime.combine(allocation.date_from, time.min), ) interval_end = min( leave.date_to, datetime.combine(allocation.date_to, time.max) if allocation.date_to else leave.date_to, ) duration = leave[leave_duration_field] if ( leave.date_from != interval_start or leave.date_to != interval_end ): duration_info = employee._get_calendar_attendances( interval_start.replace(tzinfo=pytz.UTC), interval_end.replace(tzinfo=pytz.UTC), ) duration = duration_info[ "hours" if leave_unit == "hours" else "days" ] max_allowed_duration = min( duration, leave_type_data[allocation]["virtual_remaining_leaves"], ) if not max_allowed_duration: continue allocated_time = min(max_allowed_duration, leave_duration) leave_type_data[allocation][ "virtual_leaves_taken" ] += allocated_time leave_type_data[allocation][ "virtual_remaining_leaves" ] -= allocated_time if leave.state == "validate": leave_type_data[allocation][ "leaves_taken" ] += allocated_time leave_type_data[allocation][ "remaining_leaves" ] -= allocated_time leave_duration -= allocated_time if not leave_duration: break if round(leave_duration, 2) > 0 and not skip_excess: to_recheck_leaves_per_leave_type[employee][leave_type][ "excess_days" ][leave.date_to.date()] = { "amount": leave_duration, "is_virtual": leave.state != "validate", "leave_id": leave.id, } else: if leave_unit == "hours": allocated_time = leave.number_of_hours_display else: allocated_time = leave.number_of_days_display leave_type_data[False]["virtual_leaves_taken"] += allocated_time leave_type_data[False]["virtual_remaining_leaves"] = 0 leave_type_data[False]["remaining_leaves"] = 0 if leave.state == "validate": leave_type_data[False]["leaves_taken"] += allocated_time for employee in to_recheck_leaves_per_leave_type: for leave_type in to_recheck_leaves_per_leave_type[employee]: content = to_recheck_leaves_per_leave_type[employee][leave_type] consumed_content = allocations_leaves_consumed[employee][leave_type] if content["to_recheck_leaves"]: date_to_simulate = max( content["to_recheck_leaves"].mapped("date_from") ).date() latest_accrual_bonus = 0 date_accrual_bonus = 0 virtual_remaining = 0 additional_leaves_duration = 0 for allocation in consumed_content: latest_accrual_bonus += ( allocation and allocation._get_future_leaves_on(date_to_simulate) ) date_accrual_bonus += consumed_content[allocation][ "accrual_bonus" ] virtual_remaining += consumed_content[allocation][ "virtual_remaining_leaves" ] for leave in content["to_recheck_leaves"]: additional_leaves_duration += ( leave.number_of_hours if leave_type.request_unit == "hours" else leave.number_of_days ) latest_remaining = ( virtual_remaining - date_accrual_bonus + latest_accrual_bonus ) content["exceeding_duration"] = round( min(0, latest_remaining - additional_leaves_duration), 2 ) return (allocations_leaves_consumed, to_recheck_leaves_per_leave_type) class ResourceMixin(models.AbstractModel): _inherit = "resource.mixin" def get_all_days_data(self, date_from, date_to, exclude_holidays): """ Returns days amount as float, like 'get_work_days_data()' """ # Set timezone if no timezone is explicitly given user_tz = timezone(self.env.context.get("tz") or self.tz) if not user_tz: user_tz = utc if not date_from.tzinfo: date_from = user_tz.localize(date_from) if not date_to.tzinfo: date_to = user_tz.localize(date_to) # Bring booth dates to the same timezone if date_from.tzinfo != date_to.tzinfo: date_to = date_to.astimezone(date_from.tzinfo) # Handle holidays in selected period holidays_dur = timedelta(0) if exclude_holidays: holidays_dur = ( self.env.user.company_id.resource_calendar_id.get_holidays_duration( date_from, date_to ) ) duration = date_to.date() - date_from.date() + timedelta(1) - holidays_dur days = float(duration.days) hours = days * HOURS_PER_DAY return (days, hours) class ResourceCalendar(models.Model): _inherit = "resource.calendar" def get_holidays_duration(self, start_dt, end_dt): """ Returns timedelta object which is duration of all holidays included into specified period. """ days = set() holidays_ids = self.env["holidays.calendar.leaves"].search( [("type_transfer_day", "=", "is_holiday")] ) for holiday in holidays_ids: # Transformation date to datetime date_from = datetime( year=holiday.date_from.year, month=holiday.date_from.month, day=holiday.date_from.day, hour=0, minute=0, second=0, ) date_to = datetime( year=holiday.date_to.year, month=holiday.date_to.month, day=holiday.date_to.day, hour=23, minute=59, second=59, ) # Dates in database are without timezones, and time in fact for UTC timezone, # so it should be added explicitly. holiday_date_from = utc.localize(date_from) if date_from else None holiday_date_to = utc.localize(date_to) if date_to else None # Bring booth dates to the holiday timezone if holiday_date_from and start_dt.tzinfo != holiday_date_from.tzinfo: start_dt = start_dt.astimezone(holiday_date_from.tzinfo) if holiday_date_to and end_dt.tzinfo != holiday_date_to.tzinfo: end_dt = end_dt.astimezone(holiday_date_to.tzinfo) # Check periods intersection if ( holiday_date_from and end_dt < holiday_date_from or holiday_date_to and start_dt > holiday_date_to ): continue # Handle this holiday start = start_dt.date() if holiday_date_from: start = max(start, holiday_date_from.date()) until = end_dt.date() if holiday_date_to: until = min(until, holiday_date_to.date()) for i in range((until - start + timedelta(1)).days): days.add(start + timedelta(i)) return timedelta(len(days))