749 lines
35 KiB
Python
749 lines
35 KiB
Python
# -*- coding: utf-8 -*-
|
|
# Part of Odoo. See LICENSE file for full copyright and licensing details.
|
|
|
|
import itertools
|
|
|
|
from collections import defaultdict
|
|
from datetime import datetime, timedelta
|
|
from functools import partial
|
|
from itertools import chain
|
|
|
|
from dateutil.relativedelta import relativedelta
|
|
from dateutil.rrule import rrule, DAILY
|
|
from pytz import timezone, utc
|
|
|
|
from odoo import api, fields, models, _
|
|
from odoo.addons.base.models.res_partner import _tz_get
|
|
from odoo.exceptions import ValidationError
|
|
from odoo.osv import expression
|
|
from odoo.tools.float_utils import float_round
|
|
|
|
from odoo.tools import date_utils, float_utils
|
|
from .utils import Intervals, float_to_time, make_aware, datetime_to_string, string_to_datetime, ROUNDING_FACTOR
|
|
|
|
|
|
class ResourceCalendar(models.Model):
|
|
""" Calendar model for a resource. It has
|
|
|
|
- attendance_ids: list of resource.calendar.attendance that are a working
|
|
interval in a given weekday.
|
|
- leave_ids: list of leaves linked to this calendar. A leave can be general
|
|
or linked to a specific resource, depending on its resource_id.
|
|
|
|
All methods in this class use intervals. An interval is a tuple holding
|
|
(begin_datetime, end_datetime). A list of intervals is therefore a list of
|
|
tuples, holding several intervals of work or leaves. """
|
|
_name = "resource.calendar"
|
|
_description = "Resource Working Time"
|
|
|
|
@api.model
|
|
def default_get(self, fields):
|
|
res = super().default_get(fields)
|
|
if not res.get('name') and res.get('company_id'):
|
|
res['name'] = _('Working Hours of %s', self.env['res.company'].browse(res['company_id']).name)
|
|
if 'attendance_ids' in fields and not res.get('attendance_ids'):
|
|
company_id = res.get('company_id', self.env.company.id)
|
|
company = self.env['res.company'].browse(company_id)
|
|
company_attendance_ids = company.resource_calendar_id.attendance_ids
|
|
if not company.resource_calendar_id.two_weeks_calendar and company_attendance_ids:
|
|
res['attendance_ids'] = [
|
|
(0, 0, {
|
|
'name': attendance.name,
|
|
'dayofweek': attendance.dayofweek,
|
|
'hour_from': attendance.hour_from,
|
|
'hour_to': attendance.hour_to,
|
|
'day_period': attendance.day_period,
|
|
})
|
|
for attendance in company_attendance_ids
|
|
]
|
|
else:
|
|
res['attendance_ids'] = [
|
|
(0, 0, {'name': _('Monday Morning'), 'dayofweek': '0', 'hour_from': 8, 'hour_to': 12, 'day_period': 'morning'}),
|
|
(0, 0, {'name': _('Monday Lunch'), 'dayofweek': '0', 'hour_from': 12, 'hour_to': 13, 'day_period': 'lunch'}),
|
|
(0, 0, {'name': _('Monday Afternoon'), 'dayofweek': '0', 'hour_from': 13, 'hour_to': 17, 'day_period': 'afternoon'}),
|
|
(0, 0, {'name': _('Tuesday Morning'), 'dayofweek': '1', 'hour_from': 8, 'hour_to': 12, 'day_period': 'morning'}),
|
|
(0, 0, {'name': _('Tuesday Lunch'), 'dayofweek': '1', 'hour_from': 12, 'hour_to': 13, 'day_period': 'lunch'}),
|
|
(0, 0, {'name': _('Tuesday Afternoon'), 'dayofweek': '1', 'hour_from': 13, 'hour_to': 17, 'day_period': 'afternoon'}),
|
|
(0, 0, {'name': _('Wednesday Morning'), 'dayofweek': '2', 'hour_from': 8, 'hour_to': 12, 'day_period': 'morning'}),
|
|
(0, 0, {'name': _('Wednesday Lunch'), 'dayofweek': '2', 'hour_from': 12, 'hour_to': 13, 'day_period': 'lunch'}),
|
|
(0, 0, {'name': _('Wednesday Afternoon'), 'dayofweek': '2', 'hour_from': 13, 'hour_to': 17, 'day_period': 'afternoon'}),
|
|
(0, 0, {'name': _('Thursday Morning'), 'dayofweek': '3', 'hour_from': 8, 'hour_to': 12, 'day_period': 'morning'}),
|
|
(0, 0, {'name': _('Thursday Lunch'), 'dayofweek': '3', 'hour_from': 12, 'hour_to': 13, 'day_period': 'lunch'}),
|
|
(0, 0, {'name': _('Thursday Afternoon'), 'dayofweek': '3', 'hour_from': 13, 'hour_to': 17, 'day_period': 'afternoon'}),
|
|
(0, 0, {'name': _('Friday Morning'), 'dayofweek': '4', 'hour_from': 8, 'hour_to': 12, 'day_period': 'morning'}),
|
|
(0, 0, {'name': _('Friday Lunch'), 'dayofweek': '4', 'hour_from': 12, 'hour_to': 13, 'day_period': 'lunch'}),
|
|
(0, 0, {'name': _('Friday Afternoon'), 'dayofweek': '4', 'hour_from': 13, 'hour_to': 17, 'day_period': 'afternoon'})
|
|
]
|
|
return res
|
|
|
|
name = fields.Char(required=True)
|
|
active = fields.Boolean("Active", default=True,
|
|
help="If the active field is set to false, it will allow you to hide the Working Time without removing it.")
|
|
company_id = fields.Many2one(
|
|
'res.company', 'Company', domain=lambda self: [('id', 'in', self.env.companies.ids)],
|
|
default=lambda self: self.env.company)
|
|
attendance_ids = fields.One2many(
|
|
'resource.calendar.attendance', 'calendar_id', 'Working Time',
|
|
compute='_compute_attendance_ids', store=True, readonly=False, copy=True)
|
|
leave_ids = fields.One2many(
|
|
'resource.calendar.leaves', 'calendar_id', 'Time Off')
|
|
global_leave_ids = fields.One2many(
|
|
'resource.calendar.leaves', 'calendar_id', 'Global Time Off',
|
|
compute='_compute_global_leave_ids', store=True, readonly=False,
|
|
domain=[('resource_id', '=', False)], copy=True,
|
|
)
|
|
hours_per_day = fields.Float("Average Hour per Day", store=True, compute="_compute_hours_per_day", digits=(2, 2),
|
|
help="Average hours per day a resource is supposed to work with this calendar.")
|
|
tz = fields.Selection(
|
|
_tz_get, string='Timezone', required=True,
|
|
default=lambda self: self._context.get('tz') or self.env.user.tz or self.env.ref('base.user_admin').tz or 'UTC',
|
|
help="This field is used in order to define in which timezone the resources will work.")
|
|
tz_offset = fields.Char(compute='_compute_tz_offset', string='Timezone offset')
|
|
two_weeks_calendar = fields.Boolean(string="Calendar in 2 weeks mode")
|
|
two_weeks_explanation = fields.Char('Explanation', compute="_compute_two_weeks_explanation")
|
|
|
|
@api.depends('attendance_ids', 'attendance_ids.hour_from', 'attendance_ids.hour_to', 'two_weeks_calendar')
|
|
def _compute_hours_per_day(self):
|
|
for calendar in self:
|
|
attendances = calendar._get_global_attendances()
|
|
calendar.hours_per_day = calendar._get_hours_per_day(attendances)
|
|
|
|
@api.depends('company_id')
|
|
def _compute_attendance_ids(self):
|
|
for calendar in self.filtered(lambda c: not c._origin or c._origin.company_id != c.company_id and c.company_id):
|
|
company_calendar = calendar.company_id.resource_calendar_id
|
|
calendar.update({
|
|
'two_weeks_calendar': company_calendar.two_weeks_calendar,
|
|
'tz': company_calendar.tz,
|
|
'attendance_ids': [(5, 0, 0)] + [
|
|
(0, 0, attendance._copy_attendance_vals()) for attendance in company_calendar.attendance_ids if not attendance.resource_id]
|
|
})
|
|
|
|
@api.depends('company_id')
|
|
def _compute_global_leave_ids(self):
|
|
for calendar in self.filtered(lambda c: not c._origin or c._origin.company_id != c.company_id):
|
|
calendar.update({
|
|
'global_leave_ids': [(5, 0, 0)] + [
|
|
(0, 0, leave._copy_leave_vals()) for leave in calendar.company_id.resource_calendar_id.global_leave_ids]
|
|
})
|
|
|
|
@api.depends('tz')
|
|
def _compute_tz_offset(self):
|
|
for calendar in self:
|
|
calendar.tz_offset = datetime.now(timezone(calendar.tz or 'GMT')).strftime('%z')
|
|
|
|
@api.returns('self', lambda value: value.id)
|
|
def copy(self, default=None):
|
|
self.ensure_one()
|
|
if default is None:
|
|
default = {}
|
|
if not default.get('name'):
|
|
default['name'] = _('%s (copy)', self.name)
|
|
return super().copy(default)
|
|
|
|
@api.constrains('attendance_ids')
|
|
def _check_attendance_ids(self):
|
|
for resource in self:
|
|
if (resource.two_weeks_calendar and
|
|
resource.attendance_ids.filtered(lambda a: a.display_type == 'line_section') and
|
|
not resource.attendance_ids.sorted('sequence')[0].display_type):
|
|
raise ValidationError(_("In a calendar with 2 weeks mode, all periods need to be in the sections."))
|
|
|
|
@api.depends('two_weeks_calendar')
|
|
def _compute_two_weeks_explanation(self):
|
|
today = fields.Date.today()
|
|
week_type = self.env['resource.calendar.attendance'].get_week_type(today)
|
|
week_type_str = _("second") if week_type else _("first")
|
|
first_day = date_utils.start_of(today, 'week')
|
|
last_day = date_utils.end_of(today, 'week')
|
|
self.two_weeks_explanation = _("The current week (from %s to %s) correspond to the %s one.", first_day,
|
|
last_day, week_type_str)
|
|
|
|
def _get_global_attendances(self):
|
|
return self.attendance_ids.filtered(lambda attendance:
|
|
attendance.day_period != 'lunch'
|
|
and not attendance.date_from and not attendance.date_to
|
|
and not attendance.resource_id and not attendance.display_type)
|
|
|
|
def _get_hours_per_day(self, attendances):
|
|
"""
|
|
Calculate the average hours worked per workday.
|
|
"""
|
|
if not attendances:
|
|
return 0
|
|
|
|
hour_count = 0.0
|
|
for attendance in attendances:
|
|
hour_count += attendance.hour_to - attendance.hour_from
|
|
|
|
if self.two_weeks_calendar:
|
|
number_of_days = len(set(attendances.filtered(lambda cal: cal.week_type == '1').mapped('dayofweek')))
|
|
number_of_days += len(set(attendances.filtered(lambda cal: cal.week_type == '0').mapped('dayofweek')))
|
|
else:
|
|
number_of_days = len(set(attendances.mapped('dayofweek')))
|
|
|
|
if not number_of_days:
|
|
return 0
|
|
|
|
return float_round(hour_count / float(number_of_days), precision_digits=2)
|
|
|
|
def switch_calendar_type(self):
|
|
if not self.two_weeks_calendar:
|
|
self.attendance_ids.unlink()
|
|
self.attendance_ids = [
|
|
(0, 0, {
|
|
'name': 'First week',
|
|
'dayofweek': '0',
|
|
'sequence': '0',
|
|
'hour_from': 0,
|
|
'day_period': 'morning',
|
|
'week_type': '0',
|
|
'hour_to': 0,
|
|
'display_type':
|
|
'line_section'}),
|
|
(0, 0, {
|
|
'name': 'Second week',
|
|
'dayofweek': '0',
|
|
'sequence': '25',
|
|
'hour_from': 0,
|
|
'day_period': 'morning',
|
|
'week_type': '1',
|
|
'hour_to': 0,
|
|
'display_type': 'line_section'}),
|
|
]
|
|
|
|
self.two_weeks_calendar = True
|
|
default_attendance = self.default_get('attendance_ids')['attendance_ids']
|
|
for idx, att in enumerate(default_attendance):
|
|
att[2]["week_type"] = '0'
|
|
att[2]["sequence"] = idx + 1
|
|
self.attendance_ids = default_attendance
|
|
for idx, att in enumerate(default_attendance):
|
|
att[2]["week_type"] = '1'
|
|
att[2]["sequence"] = idx + 26
|
|
self.attendance_ids = default_attendance
|
|
else:
|
|
self.two_weeks_calendar = False
|
|
self.attendance_ids.unlink()
|
|
self.attendance_ids = self.default_get('attendance_ids')['attendance_ids']
|
|
|
|
@api.onchange('attendance_ids')
|
|
def _onchange_attendance_ids(self):
|
|
if not self.two_weeks_calendar:
|
|
return
|
|
|
|
even_week_seq = self.attendance_ids.filtered(lambda att: att.display_type == 'line_section' and att.week_type == '0')
|
|
odd_week_seq = self.attendance_ids.filtered(lambda att: att.display_type == 'line_section' and att.week_type == '1')
|
|
if len(even_week_seq) != 1 or len(odd_week_seq) != 1:
|
|
raise ValidationError(_("You can't delete section between weeks."))
|
|
|
|
even_week_seq = even_week_seq.sequence
|
|
odd_week_seq = odd_week_seq.sequence
|
|
|
|
for line in self.attendance_ids.filtered(lambda att: att.display_type is False):
|
|
if even_week_seq > odd_week_seq:
|
|
line.week_type = '1' if even_week_seq > line.sequence else '0'
|
|
else:
|
|
line.week_type = '0' if odd_week_seq > line.sequence else '1'
|
|
|
|
def _check_overlap(self, attendance_ids):
|
|
""" attendance_ids correspond to attendance of a week,
|
|
will check for each day of week that there are no superimpose. """
|
|
result = []
|
|
for attendance in attendance_ids.filtered(lambda att: not att.date_from and not att.date_to):
|
|
# 0.000001 is added to each start hour to avoid to detect two contiguous intervals as superimposing.
|
|
# Indeed Intervals function will join 2 intervals with the start and stop hour corresponding.
|
|
result.append((int(attendance.dayofweek) * 24 + attendance.hour_from + 0.000001, int(attendance.dayofweek) * 24 + attendance.hour_to, attendance))
|
|
|
|
if len(Intervals(result)) != len(result):
|
|
raise ValidationError(_("Attendances can't overlap."))
|
|
|
|
@api.constrains('attendance_ids')
|
|
def _check_attendance(self):
|
|
# Avoid superimpose in attendance
|
|
for calendar in self:
|
|
attendance_ids = calendar.attendance_ids.filtered(lambda attendance: not attendance.resource_id and attendance.display_type is False)
|
|
if calendar.two_weeks_calendar:
|
|
calendar._check_overlap(attendance_ids.filtered(lambda attendance: attendance.week_type == '0'))
|
|
calendar._check_overlap(attendance_ids.filtered(lambda attendance: attendance.week_type == '1'))
|
|
else:
|
|
calendar._check_overlap(attendance_ids)
|
|
|
|
# --------------------------------------------------
|
|
# Computation API
|
|
# --------------------------------------------------
|
|
|
|
def _attendance_intervals_batch(self, start_dt, end_dt, resources=None, domain=None, tz=None, lunch=False):
|
|
assert start_dt.tzinfo and end_dt.tzinfo
|
|
self.ensure_one()
|
|
if not resources:
|
|
resources = self.env['resource.resource']
|
|
resources_list = [resources]
|
|
else:
|
|
resources_list = list(resources) + [self.env['resource.resource']]
|
|
resource_ids = [r.id for r in resources_list]
|
|
domain = domain if domain is not None else []
|
|
domain = expression.AND([domain, [
|
|
('calendar_id', '=', self.id),
|
|
('resource_id', 'in', resource_ids),
|
|
('display_type', '=', False),
|
|
('day_period', '!=' if not lunch else '=', 'lunch'),
|
|
]])
|
|
|
|
attendances = self.env['resource.calendar.attendance'].search(domain)
|
|
# Since we only have one calendar to take in account
|
|
# Group resources per tz they will all have the same result
|
|
resources_per_tz = defaultdict(list)
|
|
for resource in resources_list:
|
|
resources_per_tz[tz or timezone((resource or self).tz)].append(resource)
|
|
# Resource specific attendances
|
|
attendance_per_resource = defaultdict(lambda: self.env['resource.calendar.attendance'])
|
|
# Calendar attendances per day of the week
|
|
# * 7 days per week * 2 for two week calendars
|
|
attendances_per_day = [self.env['resource.calendar.attendance']] * 7 * 2
|
|
weekdays = set()
|
|
for attendance in attendances:
|
|
if attendance.resource_id:
|
|
attendance_per_resource[attendance.resource_id] |= attendance
|
|
weekday = int(attendance.dayofweek)
|
|
weekdays.add(weekday)
|
|
if self.two_weeks_calendar:
|
|
weektype = int(attendance.week_type)
|
|
attendances_per_day[weekday + 7 * weektype] |= attendance
|
|
else:
|
|
attendances_per_day[weekday] |= attendance
|
|
attendances_per_day[weekday + 7] |= attendance
|
|
|
|
start = start_dt.astimezone(utc)
|
|
end = end_dt.astimezone(utc)
|
|
bounds_per_tz = {
|
|
tz: (start_dt.astimezone(tz), end_dt.astimezone(tz))
|
|
for tz in resources_per_tz.keys()
|
|
}
|
|
# Use the outer bounds from the requested timezones
|
|
for tz, bounds in bounds_per_tz.items():
|
|
start = min(start, bounds[0].replace(tzinfo=utc))
|
|
end = max(end, bounds[1].replace(tzinfo=utc))
|
|
# Generate once with utc as timezone
|
|
days = rrule(DAILY, start.date(), until=end.date(), byweekday=weekdays)
|
|
ResourceCalendarAttendance = self.env['resource.calendar.attendance']
|
|
base_result = []
|
|
per_resource_result = defaultdict(list)
|
|
for day in days:
|
|
week_type = ResourceCalendarAttendance.get_week_type(day)
|
|
attendances = attendances_per_day[day.weekday() + 7 * week_type]
|
|
for attendance in attendances:
|
|
if (attendance.date_from and day.date() < attendance.date_from) or\
|
|
(attendance.date_to and attendance.date_to < day.date()):
|
|
continue
|
|
day_from = datetime.combine(day, float_to_time(attendance.hour_from))
|
|
day_to = datetime.combine(day, float_to_time(attendance.hour_to))
|
|
if attendance.resource_id:
|
|
per_resource_result[attendance.resource_id].append((day_from, day_to, attendance))
|
|
else:
|
|
base_result.append((day_from, day_to, attendance))
|
|
|
|
|
|
# Copy the result localized once per necessary timezone
|
|
# Strictly speaking comparing start_dt < time or start_dt.astimezone(tz) < time
|
|
# should always yield the same result. however while working with dates it is easier
|
|
# if all dates have the same format
|
|
result_per_tz = {
|
|
tz: [(max(bounds_per_tz[tz][0], tz.localize(val[0])),
|
|
min(bounds_per_tz[tz][1], tz.localize(val[1])),
|
|
val[2])
|
|
for val in base_result]
|
|
for tz in resources_per_tz.keys()
|
|
}
|
|
result_per_resource_id = dict()
|
|
for tz, resources in resources_per_tz.items():
|
|
res = result_per_tz[tz]
|
|
res_intervals = Intervals(res)
|
|
for resource in resources:
|
|
if resource in per_resource_result:
|
|
resource_specific_result = [(max(bounds_per_tz[tz][0], tz.localize(val[0])), min(bounds_per_tz[tz][1], tz.localize(val[1])), val[2])
|
|
for val in per_resource_result[resource]]
|
|
result_per_resource_id[resource.id] = Intervals(itertools.chain(res, resource_specific_result))
|
|
else:
|
|
result_per_resource_id[resource.id] = res_intervals
|
|
return result_per_resource_id
|
|
|
|
def _leave_intervals(self, start_dt, end_dt, resource=None, domain=None, tz=None):
|
|
if resource is None:
|
|
resource = self.env['resource.resource']
|
|
return self._leave_intervals_batch(
|
|
start_dt, end_dt, resources=resource, domain=domain, tz=tz
|
|
)[resource.id]
|
|
|
|
def _leave_intervals_batch(self, start_dt, end_dt, resources=None, domain=None, tz=None, any_calendar=False):
|
|
""" Return the leave intervals in the given datetime range.
|
|
The returned intervals are expressed in specified tz or in the calendar's timezone.
|
|
"""
|
|
assert start_dt.tzinfo and end_dt.tzinfo
|
|
self.ensure_one()
|
|
|
|
if not resources:
|
|
resources = self.env['resource.resource']
|
|
resources_list = [resources]
|
|
else:
|
|
resources_list = list(resources) + [self.env['resource.resource']]
|
|
if domain is None:
|
|
domain = [('time_type', '=', 'leave')]
|
|
if not any_calendar:
|
|
domain = domain + [('calendar_id', 'in', [False, self.id])]
|
|
# for the computation, express all datetimes in UTC
|
|
# Public leave don't have a resource_id
|
|
domain = domain + [
|
|
('resource_id', 'in', [False] + [r.id for r in resources_list]),
|
|
('date_from', '<=', datetime_to_string(end_dt)),
|
|
('date_to', '>=', datetime_to_string(start_dt)),
|
|
]
|
|
|
|
# retrieve leave intervals in (start_dt, end_dt)
|
|
result = defaultdict(lambda: [])
|
|
tz_dates = {}
|
|
all_leaves = self.env['resource.calendar.leaves'].search(domain)
|
|
for leave in all_leaves:
|
|
leave_resource = leave.resource_id
|
|
leave_company = leave.company_id
|
|
leave_date_from = leave.date_from
|
|
leave_date_to = leave.date_to
|
|
for resource in resources_list:
|
|
if leave_resource.id not in [False, resource.id] or (not leave_resource and resource and resource.company_id != leave_company):
|
|
continue
|
|
tz = tz if tz else timezone((resource or self).tz)
|
|
if (tz, start_dt) in tz_dates:
|
|
start = tz_dates[(tz, start_dt)]
|
|
else:
|
|
start = start_dt.astimezone(tz)
|
|
tz_dates[(tz, start_dt)] = start
|
|
if (tz, end_dt) in tz_dates:
|
|
end = tz_dates[(tz, end_dt)]
|
|
else:
|
|
end = end_dt.astimezone(tz)
|
|
tz_dates[(tz, end_dt)] = end
|
|
dt0 = string_to_datetime(leave_date_from).astimezone(tz)
|
|
dt1 = string_to_datetime(leave_date_to).astimezone(tz)
|
|
result[resource.id].append((max(start, dt0), min(end, dt1), leave))
|
|
|
|
return {r.id: Intervals(result[r.id]) for r in resources_list}
|
|
|
|
def _work_intervals_batch(self, start_dt, end_dt, resources=None, domain=None, tz=None, compute_leaves=True):
|
|
""" Return the effective work intervals between the given datetimes. """
|
|
if not resources:
|
|
resources = self.env['resource.resource']
|
|
resources_list = [resources]
|
|
else:
|
|
resources_list = list(resources) + [self.env['resource.resource']]
|
|
|
|
attendance_intervals = self._attendance_intervals_batch(start_dt, end_dt, resources, tz=tz or self.env.context.get("employee_timezone"))
|
|
if compute_leaves:
|
|
leave_intervals = self._leave_intervals_batch(start_dt, end_dt, resources, domain, tz=tz)
|
|
return {
|
|
r.id: (attendance_intervals[r.id] - leave_intervals[r.id]) for r in resources_list
|
|
}
|
|
else:
|
|
return {
|
|
r.id: attendance_intervals[r.id] for r in resources_list
|
|
}
|
|
|
|
def _unavailable_intervals(self, start_dt, end_dt, resource=None, domain=None, tz=None):
|
|
if resource is None:
|
|
resource = self.env['resource.resource']
|
|
return self._unavailable_intervals_batch(
|
|
start_dt, end_dt, resources=resource, domain=domain, tz=tz
|
|
)[resource.id]
|
|
|
|
def _unavailable_intervals_batch(self, start_dt, end_dt, resources=None, domain=None, tz=None):
|
|
""" Return the unavailable intervals between the given datetimes. """
|
|
if not resources:
|
|
resources = self.env['resource.resource']
|
|
resources_list = [resources]
|
|
else:
|
|
resources_list = list(resources)
|
|
|
|
resources_work_intervals = self._work_intervals_batch(start_dt, end_dt, resources, domain, tz)
|
|
result = {}
|
|
for resource in resources_list:
|
|
work_intervals = [(start, stop) for start, stop, meta in resources_work_intervals[resource.id]]
|
|
# start + flatten(intervals) + end
|
|
work_intervals = [start_dt] + list(chain.from_iterable(work_intervals)) + [end_dt]
|
|
# put it back to UTC
|
|
work_intervals = list(map(lambda dt: dt.astimezone(utc), work_intervals))
|
|
# pick groups of two
|
|
work_intervals = list(zip(work_intervals[0::2], work_intervals[1::2]))
|
|
result[resource.id] = work_intervals
|
|
return result
|
|
|
|
# --------------------------------------------------
|
|
# Private Methods / Helpers
|
|
# --------------------------------------------------
|
|
|
|
def _get_attendance_intervals_days_data(self, attendance_intervals):
|
|
"""
|
|
helper function to compute duration of `intervals` that have
|
|
'resource.calendar.attendance' records as payload (3rd element in tuple).
|
|
expressed in days and hours.
|
|
|
|
resource.calendar.attendance records have durations associated
|
|
with them so this method merely calculates the proportion that is
|
|
covered by the intervals.
|
|
"""
|
|
day_hours = defaultdict(float)
|
|
day_days = defaultdict(float)
|
|
for start, stop, meta in attendance_intervals:
|
|
# If the interval covers only a part of the original attendance, we
|
|
# take durations in days proportionally to what is left of the interval.
|
|
interval_hours = (stop - start).total_seconds() / 3600
|
|
day_hours[start.date()] += interval_hours
|
|
day_days[start.date()] += sum(meta.mapped('duration_days')) * interval_hours / sum(meta.mapped('duration_hours'))
|
|
|
|
return {
|
|
# Round the number of days to the closest 16th of a day.
|
|
'days': sum(float_utils.round(ROUNDING_FACTOR * day_days[day]) / ROUNDING_FACTOR for day in day_days),
|
|
'hours': sum(day_hours.values()),
|
|
}
|
|
|
|
def _get_days_data(self, intervals, day_total):
|
|
"""
|
|
helper function to compute duration of `intervals`
|
|
expressed in days and hours.
|
|
`day_total` is a dict {date: n_hours} with the number of hours for each day.
|
|
"""
|
|
day_hours = defaultdict(float)
|
|
for start, stop, meta in intervals:
|
|
day_hours[start.date()] += (stop - start).total_seconds() / 3600
|
|
|
|
# compute number of days as quarters
|
|
days = sum(
|
|
float_utils.round(ROUNDING_FACTOR * day_hours[day] / day_total[day]) / ROUNDING_FACTOR if day_total[day] else 0
|
|
for day in day_hours
|
|
)
|
|
return {
|
|
'days': days,
|
|
'hours': sum(day_hours.values()),
|
|
}
|
|
|
|
def _get_resources_day_total(self, from_datetime, to_datetime, resources=None):
|
|
"""
|
|
@return dict with hours of attendance in each day between `from_datetime` and `to_datetime`
|
|
"""
|
|
self.ensure_one()
|
|
if not resources:
|
|
resources = self.env['resource.resource']
|
|
resources_list = [resources]
|
|
else:
|
|
resources_list = list(resources) + [self.env['resource.resource']]
|
|
# total hours per day: retrieve attendances with one extra day margin,
|
|
# in order to compute the total hours on the first and last days
|
|
from_full = from_datetime - timedelta(days=1)
|
|
to_full = to_datetime + timedelta(days=1)
|
|
intervals = self._attendance_intervals_batch(from_full, to_full, resources=resources)
|
|
|
|
result = defaultdict(lambda: defaultdict(float))
|
|
for resource in resources_list:
|
|
day_total = result[resource.id]
|
|
for start, stop, meta in intervals[resource.id]:
|
|
day_total[start.date()] += (stop - start).total_seconds() / 3600
|
|
return result
|
|
|
|
def _get_closest_work_time(self, dt, match_end=False, resource=None, search_range=None, compute_leaves=True):
|
|
"""Return the closest work interval boundary within the search range.
|
|
Consider only starts of intervals unless `match_end` is True. It will then only consider
|
|
ends of intervals.
|
|
:param dt: reference datetime
|
|
:param match_end: wether to search for the begining of an interval or the end.
|
|
:param search_range: time interval considered. Defaults to the entire day of `dt`
|
|
:rtype: datetime | None
|
|
"""
|
|
def interval_dt(interval):
|
|
return interval[1 if match_end else 0]
|
|
|
|
tz = resource.tz if resource else self.tz
|
|
if resource is None:
|
|
resource = self.env['resource.resource']
|
|
|
|
if not dt.tzinfo or search_range and not (search_range[0].tzinfo and search_range[1].tzinfo):
|
|
raise ValueError('Provided datetimes needs to be timezoned')
|
|
|
|
dt = dt.astimezone(timezone(tz))
|
|
|
|
if not search_range:
|
|
range_start = dt + relativedelta(hour=0, minute=0, second=0)
|
|
range_end = dt + relativedelta(days=1, hour=0, minute=0, second=0)
|
|
else:
|
|
range_start, range_end = search_range
|
|
|
|
if not range_start <= dt <= range_end:
|
|
return None
|
|
work_intervals = sorted(
|
|
self._work_intervals_batch(range_start, range_end, resource, compute_leaves=compute_leaves)[resource.id],
|
|
key=lambda i: abs(interval_dt(i) - dt),
|
|
)
|
|
return interval_dt(work_intervals[0]) if work_intervals else None
|
|
|
|
def _get_unusual_days(self, start_dt, end_dt):
|
|
if not self:
|
|
return {}
|
|
self.ensure_one()
|
|
if not start_dt.tzinfo:
|
|
start_dt = start_dt.replace(tzinfo=utc)
|
|
if not end_dt.tzinfo:
|
|
end_dt = end_dt.replace(tzinfo=utc)
|
|
|
|
works = {d[0].date() for d in self._work_intervals_batch(start_dt, end_dt)[False]}
|
|
return {fields.Date.to_string(day.date()): (day.date() not in works) for day in rrule(DAILY, start_dt, until=end_dt)}
|
|
|
|
# --------------------------------------------------
|
|
# External API
|
|
# --------------------------------------------------
|
|
|
|
def get_work_hours_count(self, start_dt, end_dt, compute_leaves=True, domain=None):
|
|
"""
|
|
`compute_leaves` controls whether or not this method is taking into
|
|
account the global leaves.
|
|
|
|
`domain` controls the way leaves are recognized.
|
|
None means default value ('time_type', '=', 'leave')
|
|
|
|
Counts the number of work hours between two datetimes.
|
|
"""
|
|
self.ensure_one()
|
|
# Set timezone in UTC if no timezone is explicitly given
|
|
if not start_dt.tzinfo:
|
|
start_dt = start_dt.replace(tzinfo=utc)
|
|
if not end_dt.tzinfo:
|
|
end_dt = end_dt.replace(tzinfo=utc)
|
|
|
|
if compute_leaves:
|
|
intervals = self._work_intervals_batch(start_dt, end_dt, domain=domain)[False]
|
|
else:
|
|
intervals = self._attendance_intervals_batch(start_dt, end_dt)[False]
|
|
|
|
return sum(
|
|
(stop - start).total_seconds() / 3600
|
|
for start, stop, meta in intervals
|
|
)
|
|
|
|
def get_work_duration_data(self, from_datetime, to_datetime, compute_leaves=True, domain=None):
|
|
"""
|
|
Get the working duration (in days and hours) for a given period, only
|
|
based on the current calendar. This method does not use resource to
|
|
compute it.
|
|
|
|
`domain` is used in order to recognise the leaves to take,
|
|
None means default value ('time_type', '=', 'leave')
|
|
|
|
Returns a dict {'days': n, 'hours': h} containing the
|
|
quantity of working time expressed as days and as hours.
|
|
"""
|
|
# naive datetimes are made explicit in UTC
|
|
from_datetime, dummy = make_aware(from_datetime)
|
|
to_datetime, dummy = make_aware(to_datetime)
|
|
|
|
# actual hours per day
|
|
if compute_leaves:
|
|
intervals = self._work_intervals_batch(from_datetime, to_datetime, domain=domain)[False]
|
|
else:
|
|
intervals = self._attendance_intervals_batch(from_datetime, to_datetime, domain=domain)[False]
|
|
|
|
return self._get_attendance_intervals_days_data(intervals)
|
|
|
|
def plan_hours(self, hours, day_dt, compute_leaves=False, domain=None, resource=None):
|
|
"""
|
|
`compute_leaves` controls whether or not this method is taking into
|
|
account the global leaves.
|
|
|
|
`domain` controls the way leaves are recognized.
|
|
None means default value ('time_type', '=', 'leave')
|
|
|
|
Return datetime after having planned hours
|
|
"""
|
|
day_dt, revert = make_aware(day_dt)
|
|
|
|
if resource is None:
|
|
resource = self.env['resource.resource']
|
|
|
|
# which method to use for retrieving intervals
|
|
if compute_leaves:
|
|
get_intervals = partial(self._work_intervals_batch, domain=domain, resources=resource)
|
|
resource_id = resource.id
|
|
else:
|
|
get_intervals = self._attendance_intervals_batch
|
|
resource_id = False
|
|
|
|
if hours >= 0:
|
|
delta = timedelta(days=14)
|
|
for n in range(100):
|
|
dt = day_dt + delta * n
|
|
for start, stop, meta in get_intervals(dt, dt + delta)[resource_id]:
|
|
interval_hours = (stop - start).total_seconds() / 3600
|
|
if hours <= interval_hours:
|
|
return revert(start + timedelta(hours=hours))
|
|
hours -= interval_hours
|
|
return False
|
|
else:
|
|
hours = abs(hours)
|
|
delta = timedelta(days=14)
|
|
for n in range(100):
|
|
dt = day_dt - delta * n
|
|
for start, stop, meta in reversed(get_intervals(dt - delta, dt)[resource_id]):
|
|
interval_hours = (stop - start).total_seconds() / 3600
|
|
if hours <= interval_hours:
|
|
return revert(stop - timedelta(hours=hours))
|
|
hours -= interval_hours
|
|
return False
|
|
|
|
def plan_days(self, days, day_dt, compute_leaves=False, domain=None):
|
|
"""
|
|
`compute_leaves` controls whether or not this method is taking into
|
|
account the global leaves.
|
|
|
|
`domain` controls the way leaves are recognized.
|
|
None means default value ('time_type', '=', 'leave')
|
|
|
|
Returns the datetime of a days scheduling.
|
|
"""
|
|
day_dt, revert = make_aware(day_dt)
|
|
|
|
# which method to use for retrieving intervals
|
|
if compute_leaves:
|
|
get_intervals = partial(self._work_intervals_batch, domain=domain)
|
|
else:
|
|
get_intervals = self._attendance_intervals_batch
|
|
|
|
if days > 0:
|
|
found = set()
|
|
delta = timedelta(days=14)
|
|
for n in range(100):
|
|
dt = day_dt + delta * n
|
|
for start, stop, meta in get_intervals(dt, dt + delta)[False]:
|
|
found.add(start.date())
|
|
if len(found) == days:
|
|
return revert(stop)
|
|
return False
|
|
|
|
elif days < 0:
|
|
days = abs(days)
|
|
found = set()
|
|
delta = timedelta(days=14)
|
|
for n in range(100):
|
|
dt = day_dt - delta * n
|
|
for start, stop, meta in reversed(get_intervals(dt - delta, dt)[False]):
|
|
found.add(start.date())
|
|
if len(found) == days:
|
|
return revert(start)
|
|
return False
|
|
|
|
else:
|
|
return revert(day_dt)
|
|
|
|
def _get_max_number_of_hours(self, start, end):
|
|
self.ensure_one()
|
|
if not self.attendance_ids:
|
|
return 0
|
|
mapped_data = defaultdict(lambda: 0)
|
|
for attendance in self.attendance_ids.filtered(lambda a: a.day_period != 'lunch' and ((not a.date_from or not a.date_to) or (a.date_from <= end.date() and a.date_to >= start.date()))):
|
|
mapped_data[(attendance.week_type, attendance.dayofweek)] += attendance.hour_to - attendance.hour_from
|
|
return max(mapped_data.values())
|