437 lines
24 KiB
Python
437 lines
24 KiB
Python
# -*- coding: utf-8 -*-
|
|
# Part of Odoo. See LICENSE file for full copyright and licensing details.
|
|
|
|
import pytz
|
|
|
|
from collections import defaultdict
|
|
from datetime import datetime, timedelta
|
|
from operator import itemgetter
|
|
from pytz import timezone
|
|
|
|
from odoo import models, fields, api, exceptions, _
|
|
from odoo.addons.resource.models.utils import Intervals
|
|
from odoo.tools import format_datetime
|
|
from odoo.osv.expression import AND, OR
|
|
from odoo.tools.float_utils import float_is_zero
|
|
from odoo.exceptions import AccessError
|
|
from odoo.tools import format_duration
|
|
|
|
def get_google_maps_url(latitude, longitude):
|
|
return "https://maps.google.com?q=%s,%s" % (latitude, longitude)
|
|
|
|
|
|
class HrAttendance(models.Model):
|
|
_name = "hr.attendance"
|
|
_description = "Attendance"
|
|
_order = "check_in desc"
|
|
_inherit = "mail.thread"
|
|
|
|
def _default_employee(self):
|
|
return self.env.user.employee_id
|
|
|
|
employee_id = fields.Many2one('hr.employee', string="Employee", default=_default_employee, required=True, ondelete='cascade', index=True)
|
|
department_id = fields.Many2one('hr.department', string="Department", related="employee_id.department_id",
|
|
readonly=True)
|
|
check_in = fields.Datetime(string="Check In", default=fields.Datetime.now, required=True, tracking=True)
|
|
check_out = fields.Datetime(string="Check Out", tracking=True)
|
|
worked_hours = fields.Float(string='Worked Hours', compute='_compute_worked_hours', store=True, readonly=True)
|
|
color = fields.Integer(compute='_compute_color')
|
|
overtime_hours = fields.Float(string="Over Time", compute='_compute_overtime_hours', store=True)
|
|
in_latitude = fields.Float(string="Latitude", digits=(10, 7), readonly=True)
|
|
in_longitude = fields.Float(string="Longitude", digits=(10, 7), readonly=True)
|
|
in_country_name = fields.Char(string="Country", help="Based on IP Address", readonly=True)
|
|
in_city = fields.Char(string="City", readonly=True)
|
|
in_ip_address = fields.Char(string="IP Address", readonly=True)
|
|
in_browser = fields.Char(string="Browser", readonly=True)
|
|
in_mode = fields.Selection(string="Mode",
|
|
selection=[('kiosk', "Kiosk"),
|
|
('systray', "Systray"),
|
|
('manual', "Manual")],
|
|
readonly=True,
|
|
default='manual')
|
|
out_latitude = fields.Float(digits=(10, 7), readonly=True)
|
|
out_longitude = fields.Float(digits=(10, 7), readonly=True)
|
|
out_country_name = fields.Char(help="Based on IP Address", readonly=True)
|
|
out_city = fields.Char(readonly=True)
|
|
out_ip_address = fields.Char(readonly=True)
|
|
out_browser = fields.Char(readonly=True)
|
|
out_mode = fields.Selection(selection=[('kiosk', "Kiosk"),
|
|
('systray', "Systray"),
|
|
('manual', "Manual")],
|
|
readonly=True,
|
|
default='manual')
|
|
|
|
def _compute_color(self):
|
|
for attendance in self:
|
|
if attendance.check_out:
|
|
attendance.color = 1 if attendance.worked_hours > 16 else 0
|
|
else:
|
|
attendance.color = 1 if attendance.check_in < (datetime.today() - timedelta(days=1)) else 10
|
|
|
|
@api.depends('worked_hours')
|
|
def _compute_overtime_hours(self):
|
|
att_progress_values = dict()
|
|
if self.employee_id:
|
|
self.env['hr.attendance'].flush_model(['worked_hours'])
|
|
self.env['hr.attendance.overtime'].flush_model(['duration'])
|
|
self.env.cr.execute('''
|
|
SELECT att.id as att_id,
|
|
att.worked_hours as att_wh,
|
|
ot.id as ot_id,
|
|
ot.duration as ot_d,
|
|
ot.date as od,
|
|
att.check_in as ad
|
|
FROM hr_attendance att
|
|
INNER JOIN hr_attendance_overtime ot
|
|
ON date_trunc('day',att.check_in) = date_trunc('day', ot.date)
|
|
AND date_trunc('day',att.check_out) = date_trunc('day', ot.date)
|
|
AND att.employee_id IN %s
|
|
AND att.employee_id = ot.employee_id
|
|
ORDER BY att.check_in DESC
|
|
''', (tuple(self.employee_id.ids),))
|
|
a = self.env.cr.dictfetchall()
|
|
grouped_dict = dict()
|
|
for row in a:
|
|
if row['ot_id'] and row['att_wh']:
|
|
if row['ot_id'] not in grouped_dict:
|
|
grouped_dict[row['ot_id']] = {'attendances': [(row['att_id'], row['att_wh'])], 'overtime_duration': row['ot_d']}
|
|
else:
|
|
grouped_dict[row['ot_id']]['attendances'].append((row['att_id'], row['att_wh']))
|
|
|
|
for ot in grouped_dict:
|
|
ot_bucket = grouped_dict[ot]['overtime_duration']
|
|
for att in grouped_dict[ot]['attendances']:
|
|
if ot_bucket > 0:
|
|
sub_time = att[1] - ot_bucket
|
|
if sub_time < 0:
|
|
att_progress_values[att[0]] = 0
|
|
ot_bucket -= att[1]
|
|
else:
|
|
att_progress_values[att[0]] = float(((att[1] - ot_bucket) / att[1])*100)
|
|
ot_bucket = 0
|
|
else:
|
|
att_progress_values[att[0]] = 100
|
|
for attendance in self:
|
|
attendance.overtime_hours = attendance.worked_hours * ((100 - att_progress_values.get(attendance.id, 100))/100)
|
|
|
|
@api.depends('employee_id', 'check_in', 'check_out')
|
|
def _compute_display_name(self):
|
|
for attendance in self:
|
|
if not attendance.check_out:
|
|
attendance.display_name = _(
|
|
"From %s",
|
|
format_datetime(self.env, attendance.check_in, dt_format="HH:mm"),
|
|
)
|
|
else:
|
|
attendance.display_name = _(
|
|
"%s : (%s-%s)",
|
|
format_duration(attendance.worked_hours),
|
|
format_datetime(self.env, attendance.check_in, dt_format="HH:mm"),
|
|
format_datetime(self.env, attendance.check_out, dt_format="HH:mm"),
|
|
)
|
|
|
|
def _get_employee_calendar(self):
|
|
self.ensure_one()
|
|
return self.employee_id.resource_calendar_id or self.employee_id.company_id.resource_calendar_id
|
|
|
|
@api.depends('check_in', 'check_out')
|
|
def _compute_worked_hours(self):
|
|
for attendance in self:
|
|
if attendance.check_out and attendance.check_in and attendance.employee_id:
|
|
calendar = attendance._get_employee_calendar()
|
|
resource = attendance.employee_id.resource_id
|
|
tz = timezone(calendar.tz)
|
|
check_in_tz = attendance.check_in.astimezone(tz)
|
|
check_out_tz = attendance.check_out.astimezone(tz)
|
|
lunch_intervals = calendar._attendance_intervals_batch(
|
|
check_in_tz, check_out_tz, resource, lunch=True)
|
|
attendance_intervals = Intervals([(check_in_tz, check_out_tz, attendance)]) - lunch_intervals[resource.id]
|
|
delta = sum((i[1] - i[0]).total_seconds() for i in attendance_intervals)
|
|
attendance.worked_hours = delta / 3600.0
|
|
else:
|
|
attendance.worked_hours = False
|
|
|
|
@api.constrains('check_in', 'check_out')
|
|
def _check_validity_check_in_check_out(self):
|
|
""" verifies if check_in is earlier than check_out. """
|
|
for attendance in self:
|
|
if attendance.check_in and attendance.check_out:
|
|
if attendance.check_out < attendance.check_in:
|
|
raise exceptions.ValidationError(_('"Check Out" time cannot be earlier than "Check In" time.'))
|
|
|
|
@api.constrains('check_in', 'check_out', 'employee_id')
|
|
def _check_validity(self):
|
|
""" Verifies the validity of the attendance record compared to the others from the same employee.
|
|
For the same employee we must have :
|
|
* maximum 1 "open" attendance record (without check_out)
|
|
* no overlapping time slices with previous employee records
|
|
"""
|
|
for attendance in self:
|
|
# we take the latest attendance before our check_in time and check it doesn't overlap with ours
|
|
last_attendance_before_check_in = self.env['hr.attendance'].search([
|
|
('employee_id', '=', attendance.employee_id.id),
|
|
('check_in', '<=', attendance.check_in),
|
|
('id', '!=', attendance.id),
|
|
], order='check_in desc', limit=1)
|
|
if last_attendance_before_check_in and last_attendance_before_check_in.check_out and last_attendance_before_check_in.check_out > attendance.check_in:
|
|
raise exceptions.ValidationError(_("Cannot create new attendance record for %(empl_name)s, the employee was already checked in on %(datetime)s",
|
|
empl_name=attendance.employee_id.name,
|
|
datetime=format_datetime(self.env, attendance.check_in, dt_format=False)))
|
|
|
|
if not attendance.check_out:
|
|
# if our attendance is "open" (no check_out), we verify there is no other "open" attendance
|
|
no_check_out_attendances = self.env['hr.attendance'].search([
|
|
('employee_id', '=', attendance.employee_id.id),
|
|
('check_out', '=', False),
|
|
('id', '!=', attendance.id),
|
|
], order='check_in desc', limit=1)
|
|
if no_check_out_attendances:
|
|
raise exceptions.ValidationError(_("Cannot create new attendance record for %(empl_name)s, the employee hasn't checked out since %(datetime)s",
|
|
empl_name=attendance.employee_id.name,
|
|
datetime=format_datetime(self.env, no_check_out_attendances.check_in, dt_format=False)))
|
|
else:
|
|
# we verify that the latest attendance with check_in time before our check_out time
|
|
# is the same as the one before our check_in time computed before, otherwise it overlaps
|
|
last_attendance_before_check_out = self.env['hr.attendance'].search([
|
|
('employee_id', '=', attendance.employee_id.id),
|
|
('check_in', '<', attendance.check_out),
|
|
('id', '!=', attendance.id),
|
|
], order='check_in desc', limit=1)
|
|
if last_attendance_before_check_out and last_attendance_before_check_in != last_attendance_before_check_out:
|
|
raise exceptions.ValidationError(_("Cannot create new attendance record for %(empl_name)s, the employee was already checked in on %(datetime)s",
|
|
empl_name=attendance.employee_id.name,
|
|
datetime=format_datetime(self.env, last_attendance_before_check_out.check_in, dt_format=False)))
|
|
|
|
@api.model
|
|
def _get_day_start_and_day(self, employee, dt):
|
|
#Returns a tuple containing the datetime in naive UTC of the employee's start of the day
|
|
# and the date it was for that employee
|
|
if not dt.tzinfo:
|
|
date_employee_tz = pytz.utc.localize(dt).astimezone(pytz.timezone(employee._get_tz()))
|
|
else:
|
|
date_employee_tz = dt
|
|
start_day_employee_tz = date_employee_tz.replace(hour=0, minute=0, second=0)
|
|
return (start_day_employee_tz.astimezone(pytz.utc).replace(tzinfo=None), start_day_employee_tz.date())
|
|
|
|
def _get_attendances_dates(self):
|
|
# Returns a dictionnary {employee_id: set((datetimes, dates))}
|
|
attendances_emp = defaultdict(set)
|
|
for attendance in self.filtered(lambda a: a.employee_id.company_id.hr_attendance_overtime and a.check_in):
|
|
check_in_day_start = attendance._get_day_start_and_day(attendance.employee_id, attendance.check_in)
|
|
if check_in_day_start[0] < datetime.combine(attendance.employee_id.company_id.overtime_start_date, datetime.min.time()):
|
|
continue
|
|
attendances_emp[attendance.employee_id].add(check_in_day_start)
|
|
if attendance.check_out:
|
|
check_out_day_start = attendance._get_day_start_and_day(attendance.employee_id, attendance.check_out)
|
|
attendances_emp[attendance.employee_id].add(check_out_day_start)
|
|
return attendances_emp
|
|
|
|
def _get_overtime_leave_domain(self):
|
|
return []
|
|
|
|
def _update_overtime(self, employee_attendance_dates=None):
|
|
if employee_attendance_dates is None:
|
|
employee_attendance_dates = self._get_attendances_dates()
|
|
|
|
overtime_to_unlink = self.env['hr.attendance.overtime']
|
|
overtime_vals_list = []
|
|
affected_employees = self.env['hr.employee']
|
|
for emp, attendance_dates in employee_attendance_dates.items():
|
|
# get_attendances_dates returns the date translated from the local timezone without tzinfo,
|
|
# and contains all the date which we need to check for overtime
|
|
attendance_domain = []
|
|
for attendance_date in attendance_dates:
|
|
attendance_domain = OR([attendance_domain, [
|
|
('check_in', '>=', attendance_date[0]), ('check_in', '<', attendance_date[0] + timedelta(hours=24)),
|
|
]])
|
|
attendance_domain = AND([[('employee_id', '=', emp.id)], attendance_domain])
|
|
|
|
# Attendances per LOCAL day
|
|
attendances_per_day = defaultdict(lambda: self.env['hr.attendance'])
|
|
all_attendances = self.env['hr.attendance'].search(attendance_domain)
|
|
for attendance in all_attendances:
|
|
check_in_day_start = attendance._get_day_start_and_day(attendance.employee_id, attendance.check_in)
|
|
attendances_per_day[check_in_day_start[1]] += attendance
|
|
|
|
# As _attendance_intervals_batch and _leave_intervals_batch both take localized dates we need to localize those date
|
|
start = pytz.utc.localize(min(attendance_dates, key=itemgetter(0))[0])
|
|
stop = pytz.utc.localize(max(attendance_dates, key=itemgetter(0))[0] + timedelta(hours=24))
|
|
|
|
# Retrieve expected attendance intervals
|
|
calendar = emp.resource_calendar_id or emp.company_id.resource_calendar_id
|
|
expected_attendances = calendar._attendance_intervals_batch(
|
|
start, stop, emp.resource_id
|
|
)[emp.resource_id.id]
|
|
# Substract Global Leaves and Employee's Leaves
|
|
leave_intervals = calendar._leave_intervals_batch(
|
|
start, stop, emp.resource_id, domain=AND([
|
|
self._get_overtime_leave_domain(),
|
|
[('company_id', 'in', [False, emp.company_id.id])],
|
|
])
|
|
)
|
|
expected_attendances -= leave_intervals[False] | leave_intervals[emp.resource_id.id]
|
|
|
|
# working_times = {date: [(start, stop)]}
|
|
working_times = defaultdict(lambda: [])
|
|
for expected_attendance in expected_attendances:
|
|
# Exclude resource.calendar.attendance
|
|
working_times[expected_attendance[0].date()].append(expected_attendance[:2])
|
|
|
|
overtimes = self.env['hr.attendance.overtime'].sudo().search([
|
|
('employee_id', '=', emp.id),
|
|
('date', 'in', [day_data[1] for day_data in attendance_dates]),
|
|
('adjustment', '=', False),
|
|
])
|
|
|
|
company_threshold = emp.company_id.overtime_company_threshold / 60.0
|
|
employee_threshold = emp.company_id.overtime_employee_threshold / 60.0
|
|
|
|
for day_data in attendance_dates:
|
|
attendance_date = day_data[1]
|
|
attendances = attendances_per_day.get(attendance_date, self.browse())
|
|
unfinished_shifts = attendances.filtered(lambda a: not a.check_out)
|
|
overtime_duration = 0
|
|
overtime_duration_real = 0
|
|
# Overtime is not counted if any shift is not closed or if there are no attendances for that day,
|
|
# this could happen when deleting attendances.
|
|
if not unfinished_shifts and attendances:
|
|
# The employee usually doesn't work on that day
|
|
if not working_times[attendance_date]:
|
|
# User does not have any resource_calendar_attendance for that day (week-end for example)
|
|
overtime_duration = sum(attendances.mapped('worked_hours'))
|
|
overtime_duration_real = overtime_duration
|
|
# The employee usually work on that day
|
|
else:
|
|
# Compute start and end time for that day
|
|
planned_start_dt, planned_end_dt = False, False
|
|
planned_work_duration = 0
|
|
for calendar_attendance in working_times[attendance_date]:
|
|
planned_start_dt = min(planned_start_dt, calendar_attendance[0]) if planned_start_dt else calendar_attendance[0]
|
|
planned_end_dt = max(planned_end_dt, calendar_attendance[1]) if planned_end_dt else calendar_attendance[1]
|
|
planned_work_duration += (calendar_attendance[1] - calendar_attendance[0]).total_seconds() / 3600.0
|
|
# Count time before, during and after 'working hours'
|
|
pre_work_time, work_duration, post_work_time = 0, 0, 0
|
|
|
|
for attendance in attendances:
|
|
# consider check_in as planned_start_dt if within threshold
|
|
# if delta_in < 0: Checked in after supposed start of the day
|
|
# if delta_in > 0: Checked in before supposed start of the day
|
|
local_check_in = pytz.utc.localize(attendance.check_in)
|
|
delta_in = (planned_start_dt - local_check_in).total_seconds() / 3600.0
|
|
|
|
# Started before or after planned date within the threshold interval
|
|
if (delta_in > 0 and delta_in <= company_threshold) or\
|
|
(delta_in < 0 and abs(delta_in) <= employee_threshold):
|
|
local_check_in = planned_start_dt
|
|
local_check_out = pytz.utc.localize(attendance.check_out)
|
|
|
|
# same for check_out as planned_end_dt
|
|
delta_out = (local_check_out - planned_end_dt).total_seconds() / 3600.0
|
|
# if delta_out < 0: Checked out before supposed start of the day
|
|
# if delta_out > 0: Checked out after supposed start of the day
|
|
|
|
# Finised before or after planned date within the threshold interval
|
|
if (delta_out > 0 and delta_out <= company_threshold) or\
|
|
(delta_out < 0 and abs(delta_out) <= employee_threshold):
|
|
local_check_out = planned_end_dt
|
|
|
|
# There is an overtime at the start of the day
|
|
if local_check_in < planned_start_dt:
|
|
pre_work_time += (min(planned_start_dt, local_check_out) - local_check_in).total_seconds() / 3600.0
|
|
# Interval inside the working hours -> Considered as working time
|
|
if local_check_in <= planned_end_dt and local_check_out >= planned_start_dt:
|
|
start_dt = max(planned_start_dt, local_check_in)
|
|
stop_dt = min(planned_end_dt, local_check_out)
|
|
work_duration += (stop_dt - start_dt).total_seconds() / 3600.0
|
|
# remove lunch time from work duration
|
|
lunch_intervals = calendar._attendance_intervals_batch(start_dt, stop_dt, emp.resource_id, lunch=True)
|
|
work_duration -= sum((i[1] - i[0]).total_seconds() / 3600.0 for i in lunch_intervals[emp.resource_id.id])
|
|
|
|
# There is an overtime at the end of the day
|
|
if local_check_out > planned_end_dt:
|
|
post_work_time += (local_check_out - max(planned_end_dt, local_check_in)).total_seconds() / 3600.0
|
|
|
|
# Overtime within the planned work hours + overtime before/after work hours is > company threshold
|
|
overtime_duration = work_duration - planned_work_duration
|
|
if pre_work_time > company_threshold:
|
|
overtime_duration += pre_work_time
|
|
if post_work_time > company_threshold:
|
|
overtime_duration += post_work_time
|
|
# Global overtime including the thresholds
|
|
overtime_duration_real = sum(attendances.mapped('worked_hours')) - planned_work_duration
|
|
|
|
overtime = overtimes.filtered(lambda o: o.date == attendance_date)
|
|
if not float_is_zero(overtime_duration, 2) or unfinished_shifts:
|
|
# Do not create if any attendance doesn't have a check_out, update if exists
|
|
if unfinished_shifts:
|
|
overtime_duration = 0
|
|
if not overtime and overtime_duration:
|
|
overtime_vals_list.append({
|
|
'employee_id': emp.id,
|
|
'date': attendance_date,
|
|
'duration': overtime_duration,
|
|
'duration_real': overtime_duration_real,
|
|
})
|
|
elif overtime:
|
|
overtime.sudo().write({
|
|
'duration': overtime_duration,
|
|
'duration_real': overtime_duration
|
|
})
|
|
affected_employees |= overtime.employee_id
|
|
elif overtime:
|
|
overtime_to_unlink |= overtime
|
|
created_overtimes = self.env['hr.attendance.overtime'].sudo().create(overtime_vals_list)
|
|
employees_worked_hours_to_compute = (affected_employees.ids +
|
|
created_overtimes.employee_id.ids +
|
|
overtime_to_unlink.employee_id.ids)
|
|
overtime_to_unlink.sudo().unlink()
|
|
self.env.add_to_compute(self._fields['overtime_hours'],
|
|
self.search([('employee_id', 'in', employees_worked_hours_to_compute)]))
|
|
|
|
@api.model_create_multi
|
|
def create(self, vals_list):
|
|
res = super().create(vals_list)
|
|
res._update_overtime()
|
|
return res
|
|
|
|
def write(self, vals):
|
|
if vals.get('employee_id') and \
|
|
vals['employee_id'] not in self.env.user.employee_ids.ids and \
|
|
not self.env.user.has_group('hr_attendance.group_hr_attendance_officer'):
|
|
raise AccessError(_("Do not have access, user cannot edit the attendances that are not his own."))
|
|
attendances_dates = self._get_attendances_dates()
|
|
result = super(HrAttendance, self).write(vals)
|
|
if any(field in vals for field in ['employee_id', 'check_in', 'check_out']):
|
|
# Merge attendance dates before and after write to recompute the
|
|
# overtime if the attendances have been moved to another day
|
|
for emp, dates in self._get_attendances_dates().items():
|
|
attendances_dates[emp] |= dates
|
|
self._update_overtime(attendances_dates)
|
|
return result
|
|
|
|
def unlink(self):
|
|
attendances_dates = self._get_attendances_dates()
|
|
res = super().unlink()
|
|
self._update_overtime(attendances_dates)
|
|
return res
|
|
|
|
@api.returns('self', lambda value: value.id)
|
|
def copy(self, default=None):
|
|
raise exceptions.UserError(_('You cannot duplicate an attendance.'))
|
|
|
|
def action_in_attendance_maps(self):
|
|
self.ensure_one()
|
|
return {
|
|
'type': 'ir.actions.act_url',
|
|
'url': get_google_maps_url(self.in_latitude, self.in_longitude),
|
|
'target': 'new'
|
|
}
|
|
|
|
def action_out_attendance_maps(self):
|
|
self.ensure_one()
|
|
return {
|
|
'type': 'ir.actions.act_url',
|
|
'url': get_google_maps_url(self.out_latitude, self.out_longitude),
|
|
'target': 'new'
|
|
}
|