hr_holidays_ru/models/resource.py

391 lines
17 KiB
Python
Raw Permalink Normal View History

# -*- coding: utf-8 -*-
import logging
from datetime import datetime
from datetime import timedelta
from pytz import timezone, utc
from datetime import datetime, time
from collections import defaultdict
from dateutil.relativedelta import relativedelta
import pytz
from odoo import models, fields
from odoo.addons.resource.models.utils import HOURS_PER_DAY
_logger = logging.getLogger(__name__)
class Employee(models.Model):
_inherit = "hr.employee"
def _get_consumed_leaves(self, leave_types, target_date=False, ignore_future=False):
employees = self or self._get_contextual_employee()
leaves_domain = [
("holiday_status_id", "in", leave_types.ids),
("employee_id", "in", employees.ids),
("state", "in", ["confirm", "validate1", "validate"]),
]
if self.env.context.get("ignored_leave_ids"):
leaves_domain.append(
("id", "not in", self.env.context.get("ignored_leave_ids"))
)
if not target_date:
target_date = fields.Date.today()
if ignore_future:
leaves_domain.append(("date_from", "<=", target_date))
leaves = self.env["hr.leave"].search(leaves_domain)
leaves_per_employee_type = defaultdict(
lambda: defaultdict(lambda: self.env["hr.leave"])
)
for leave in leaves:
leaves_per_employee_type[leave.employee_id][
leave.holiday_status_id
] |= leave
allocations = (
self.env["hr.leave.allocation"]
.with_context(active_test=False)
.search(
[
("employee_id", "in", employees.ids),
("holiday_status_id", "in", leave_types.ids),
("state", "=", "validate"),
]
)
.filtered(lambda al: al.active or not al.employee_id.active)
)
allocations_per_employee_type = defaultdict(
lambda: defaultdict(lambda: self.env["hr.leave.allocation"])
)
for allocation in allocations:
allocations_per_employee_type[allocation.employee_id][
allocation.holiday_status_id
] |= allocation
# _get_consumed_leaves returns a tuple of two dictionnaries.
# 1) The first is a dictionary to map the number of days/hours of leaves taken per allocation
# The structure is the following:
# - KEYS:
# allocation_leaves_consumed
# |--employee_id
# |--holiday_status_id
# |--allocation
# |--virtual_leaves_taken
# |--leaves_taken
# |--virtual_remaining_leaves
# |--remaining_leaves
# |--max_leaves
# |--accrual_bonus
# - VALUES:
# Integer representing the number of (virtual) remaining leaves, (virtual) leaves taken or max leaves
# for each allocation.
# leaves_taken and remaining_leaves only take into account validated leaves, while the "virtual" equivalent are
# also based on leaves in "confirm" or "validate1" state.
# Accrual bonus gives the amount of additional leaves that will have been granted at the given
# target_date in comparison to today.
# The unit is in hour or days depending on the leave type request unit
# 2) The second is a dictionary mapping the remaining days per employee and per leave type that are either
# not taken into account by the allocations, mainly because accruals don't take future leaves into account.
# This is used to warn the user if the leaves they takes bring them above their available limit.
# - KEYS:
# allocation_leaves_consumed
# |--employee_id
# |--holiday_status_id
# |--to_recheck_leaves
# |--excess_days
# |--exceeding_duration
# - VALUES:
# "to_recheck_leaves" stores every leave that is not yet taken into account by the "allocation_leaves_consumed" dictionary.
# "excess_days" represents the excess amount that somehow isn't taken into account by the first dictionary.
# "exceeding_duration" sum up the to_recheck_leaves duration and compares it to the maximum allocated for that time period.
allocations_leaves_consumed = defaultdict(
lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: 0)))
)
to_recheck_leaves_per_leave_type = defaultdict(
lambda: defaultdict(
lambda: {
"excess_days": defaultdict(
lambda: {
"amount": 0,
"is_virtual": True,
}
),
"exceeding_duration": 0,
"to_recheck_leaves": self.env["hr.leave"],
}
)
)
for allocation in allocations:
allocation_data = allocations_leaves_consumed[allocation.employee_id][
allocation.holiday_status_id
][allocation]
future_leaves = 0
if allocation.allocation_type == "accrual":
future_leaves = allocation._get_future_leaves_on(target_date)
max_leaves = (
allocation.number_of_hours_display
if allocation.type_request_unit in ["hour"]
else allocation.number_of_days_display
)
max_leaves += future_leaves
allocation_data.update(
{
"max_leaves": max_leaves,
"accrual_bonus": future_leaves,
"virtual_remaining_leaves": max_leaves,
"remaining_leaves": max_leaves,
"leaves_taken": 0,
"virtual_leaves_taken": 0,
}
)
for employee in employees:
for leave_type in leave_types:
allocations_with_date_to = self.env["hr.leave.allocation"]
allocations_without_date_to = self.env["hr.leave.allocation"]
for leave_allocation in allocations_per_employee_type[employee][
leave_type
]:
if leave_allocation.date_to:
allocations_with_date_to |= leave_allocation
else:
allocations_without_date_to |= leave_allocation
sorted_leave_allocations = (
allocations_with_date_to.sorted(key="date_to")
+ allocations_without_date_to
)
if leave_type.request_unit in ["c_day", "day", "half_day"]:
leave_duration_field = "number_of_days"
leave_unit = "days"
else:
leave_duration_field = "number_of_hours_display"
leave_unit = "hours"
leave_type_data = allocations_leaves_consumed[employee][leave_type]
for leave in leaves_per_employee_type[employee][leave_type].sorted(
"date_from"
):
leave_duration = leave[leave_duration_field]
skip_excess = False
if (
sorted_leave_allocations.filtered(
lambda alloc: alloc.allocation_type == "accrual"
)
and leave.date_from.date() > target_date
):
to_recheck_leaves_per_leave_type[employee][leave_type][
"to_recheck_leaves"
] |= leave
skip_excess = True
continue
if leave_type.requires_allocation == "yes":
for allocation in sorted_leave_allocations:
# We don't want to include future leaves linked to accruals into the total count of available leaves.
# However, we'll need to check if those leaves take more than what will be accrued in total of those days
# to give a warning if the total exceeds what will be accrued.
if allocation.date_from > leave.date_to.date() or (
allocation.date_to
and allocation.date_to < leave.date_from.date()
):
continue
interval_start = max(
leave.date_from,
datetime.combine(allocation.date_from, time.min),
)
interval_end = min(
leave.date_to,
datetime.combine(allocation.date_to, time.max)
if allocation.date_to
else leave.date_to,
)
duration = leave[leave_duration_field]
if (
leave.date_from != interval_start
or leave.date_to != interval_end
):
duration_info = employee._get_calendar_attendances(
interval_start.replace(tzinfo=pytz.UTC),
interval_end.replace(tzinfo=pytz.UTC),
)
duration = duration_info[
"hours" if leave_unit == "hours" else "days"
]
max_allowed_duration = min(
duration,
leave_type_data[allocation]["virtual_remaining_leaves"],
)
if not max_allowed_duration:
continue
allocated_time = min(max_allowed_duration, leave_duration)
leave_type_data[allocation][
"virtual_leaves_taken"
] += allocated_time
leave_type_data[allocation][
"virtual_remaining_leaves"
] -= allocated_time
if leave.state == "validate":
leave_type_data[allocation][
"leaves_taken"
] += allocated_time
leave_type_data[allocation][
"remaining_leaves"
] -= allocated_time
leave_duration -= allocated_time
if not leave_duration:
break
if round(leave_duration, 2) > 0 and not skip_excess:
to_recheck_leaves_per_leave_type[employee][leave_type][
"excess_days"
][leave.date_to.date()] = {
"amount": leave_duration,
"is_virtual": leave.state != "validate",
"leave_id": leave.id,
}
else:
if leave_unit == "hours":
allocated_time = leave.number_of_hours_display
else:
allocated_time = leave.number_of_days_display
leave_type_data[False]["virtual_leaves_taken"] += allocated_time
leave_type_data[False]["virtual_remaining_leaves"] = 0
leave_type_data[False]["remaining_leaves"] = 0
if leave.state == "validate":
leave_type_data[False]["leaves_taken"] += allocated_time
for employee in to_recheck_leaves_per_leave_type:
for leave_type in to_recheck_leaves_per_leave_type[employee]:
content = to_recheck_leaves_per_leave_type[employee][leave_type]
consumed_content = allocations_leaves_consumed[employee][leave_type]
if content["to_recheck_leaves"]:
date_to_simulate = max(
content["to_recheck_leaves"].mapped("date_from")
).date()
latest_accrual_bonus = 0
date_accrual_bonus = 0
virtual_remaining = 0
additional_leaves_duration = 0
for allocation in consumed_content:
latest_accrual_bonus += (
allocation
and allocation._get_future_leaves_on(date_to_simulate)
)
date_accrual_bonus += consumed_content[allocation][
"accrual_bonus"
]
virtual_remaining += consumed_content[allocation][
"virtual_remaining_leaves"
]
for leave in content["to_recheck_leaves"]:
additional_leaves_duration += (
leave.number_of_hours
if leave_type.request_unit == "hours"
else leave.number_of_days
)
latest_remaining = (
virtual_remaining - date_accrual_bonus + latest_accrual_bonus
)
content["exceeding_duration"] = round(
min(0, latest_remaining - additional_leaves_duration), 2
)
return (allocations_leaves_consumed, to_recheck_leaves_per_leave_type)
class ResourceMixin(models.AbstractModel):
_inherit = "resource.mixin"
def get_all_days_data(self, date_from, date_to, exclude_holidays):
"""
Returns days amount as float, like 'get_work_days_data()'
"""
# Set timezone if no timezone is explicitly given
user_tz = timezone(self.env.context.get("tz") or self.tz)
if not user_tz:
user_tz = utc
if not date_from.tzinfo:
date_from = user_tz.localize(date_from)
if not date_to.tzinfo:
date_to = user_tz.localize(date_to)
# Bring booth dates to the same timezone
if date_from.tzinfo != date_to.tzinfo:
date_to = date_to.astimezone(date_from.tzinfo)
# Handle holidays in selected period
holidays_dur = timedelta(0)
if exclude_holidays:
holidays_dur = (
self.env.user.company_id.resource_calendar_id.get_holidays_duration(
date_from, date_to
)
)
duration = date_to.date() - date_from.date() + timedelta(1) - holidays_dur
days = float(duration.days)
hours = days * HOURS_PER_DAY
return (days, hours)
class ResourceCalendar(models.Model):
_inherit = "resource.calendar"
def get_holidays_duration(self, start_dt, end_dt):
"""
Returns timedelta object which is duration of all holidays included into
specified period.
"""
days = set()
holidays_ids = self.env["holidays.calendar.leaves"].search(
[("type_transfer_day", "=", "is_holiday")]
)
for holiday in holidays_ids:
# Transformation date to datetime
date_from = datetime(
year=holiday.date_from.year,
month=holiday.date_from.month,
day=holiday.date_from.day,
hour=0,
minute=0,
second=0,
)
date_to = datetime(
year=holiday.date_to.year,
month=holiday.date_to.month,
day=holiday.date_to.day,
hour=23,
minute=59,
second=59,
)
# Dates in database are without timezones, and time in fact for UTC timezone,
# so it should be added explicitly.
holiday_date_from = utc.localize(date_from) if date_from else None
holiday_date_to = utc.localize(date_to) if date_to else None
# Bring booth dates to the holiday timezone
if holiday_date_from and start_dt.tzinfo != holiday_date_from.tzinfo:
start_dt = start_dt.astimezone(holiday_date_from.tzinfo)
if holiday_date_to and end_dt.tzinfo != holiday_date_to.tzinfo:
end_dt = end_dt.astimezone(holiday_date_to.tzinfo)
# Check periods intersection
if (
holiday_date_from
and end_dt < holiday_date_from
or holiday_date_to
and start_dt > holiday_date_to
):
continue
# Handle this holiday
start = start_dt.date()
if holiday_date_from:
start = max(start, holiday_date_from.date())
until = end_dt.date()
if holiday_date_to:
until = min(until, holiday_date_to.date())
for i in range((until - start + timedelta(1)).days):
days.add(start + timedelta(i))
return timedelta(len(days))