hr_contract/models/hr_contract.py

369 lines
18 KiB
Python
Raw Permalink Normal View History

# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
import threading
from datetime import date
from dateutil.relativedelta import relativedelta
from odoo import api, fields, models, _
from odoo.exceptions import ValidationError
from odoo.osv import expression
import logging
_logger = logging.getLogger(__name__)
class Contract(models.Model):
_name = 'hr.contract'
_description = 'Contract'
_inherit = ['mail.thread', 'mail.activity.mixin']
_mail_post_access = 'read'
name = fields.Char('Contract Reference', required=True)
active = fields.Boolean(default=True)
structure_type_id = fields.Many2one('hr.payroll.structure.type', string="Salary Structure Type", compute="_compute_structure_type_id", readonly=False, store=True)
employee_id = fields.Many2one('hr.employee', string='Employee', tracking=True, domain="['|', ('company_id', '=', False), ('company_id', '=', company_id)]", index=True)
department_id = fields.Many2one('hr.department', compute='_compute_employee_contract', store=True, readonly=False,
domain="['|', ('company_id', '=', False), ('company_id', '=', company_id)]", string="Department")
job_id = fields.Many2one('hr.job', compute='_compute_employee_contract', store=True, readonly=False,
domain="['|', ('company_id', '=', False), ('company_id', '=', company_id)]", string='Job Position')
date_start = fields.Date('Start Date', required=True, default=fields.Date.today, tracking=True, index=True)
date_end = fields.Date('End Date', tracking=True,
help="End date of the contract (if it's a fixed-term contract).")
trial_date_end = fields.Date('End of Trial Period',
help="End date of the trial period (if there is one).")
resource_calendar_id = fields.Many2one(
'resource.calendar', 'Working Schedule', compute='_compute_employee_contract', store=True, readonly=False,
default=lambda self: self.env.company.resource_calendar_id.id, copy=False, index=True,
domain="['|', ('company_id', '=', False), ('company_id', '=', company_id)]")
wage = fields.Monetary('Wage', required=True, tracking=True, help="Employee's monthly gross wage.", group_operator="avg")
contract_wage = fields.Monetary('Contract Wage', compute='_compute_contract_wage')
notes = fields.Html('Notes')
state = fields.Selection([
('draft', 'New'),
('open', 'Running'),
('close', 'Expired'),
('cancel', 'Cancelled')
], string='Status', group_expand='_expand_states', copy=False,
tracking=True, help='Status of the contract', default='draft')
company_id = fields.Many2one('res.company', compute='_compute_employee_contract', store=True, readonly=False,
default=lambda self: self.env.company, required=True)
company_country_id = fields.Many2one('res.country', string="Company country", related='company_id.country_id', readonly=True)
country_code = fields.Char(related='company_country_id.code', depends=['company_country_id'], readonly=True)
contract_type_id = fields.Many2one('hr.contract.type', "Contract Type")
contracts_count = fields.Integer(related='employee_id.contracts_count')
"""
kanban_state:
* draft + green = "Incoming" state (will be set as Open once the contract has started)
* open + red = "Pending" state (will be set as Closed once the contract has ended)
* red = Shows a warning on the employees kanban view
"""
kanban_state = fields.Selection([
('normal', 'Grey'),
('done', 'Green'),
('blocked', 'Red')
], string='Kanban State', default='normal', tracking=True, copy=False)
currency_id = fields.Many2one(string="Currency", related='company_id.currency_id', readonly=True)
permit_no = fields.Char('Work Permit No', related="employee_id.permit_no", readonly=False)
visa_no = fields.Char('Visa No', related="employee_id.visa_no", readonly=False)
def _get_hr_responsible_domain(self):
return "[('share', '=', False), ('company_ids', 'in', company_id), ('groups_id', 'in', %s)]" % self.env.ref('hr.group_hr_user').id
hr_responsible_id = fields.Many2one('res.users', 'HR Responsible', tracking=True,
help='Person responsible for validating the employee\'s contracts.', domain=_get_hr_responsible_domain)
calendar_mismatch = fields.Boolean(compute='_compute_calendar_mismatch', compute_sudo=True)
first_contract_date = fields.Date(related='employee_id.first_contract_date')
@api.depends('employee_id.resource_calendar_id', 'resource_calendar_id')
def _compute_calendar_mismatch(self):
for contract in self:
contract.calendar_mismatch = contract.resource_calendar_id != contract.employee_id.resource_calendar_id
def _expand_states(self, states, domain, order):
return [key for key, val in self._fields['state'].selection]
@api.depends('employee_id')
def _compute_employee_contract(self):
for contract in self.filtered('employee_id'):
contract.job_id = contract.employee_id.job_id
contract.department_id = contract.employee_id.department_id
contract.resource_calendar_id = contract.employee_id.resource_calendar_id
contract.company_id = contract.employee_id.company_id
@api.depends('company_id')
def _compute_structure_type_id(self):
default_structure_by_country = {}
def _default_salary_structure(country_id):
default_structure = default_structure_by_country.get(country_id)
if default_structure is None:
default_structure = default_structure_by_country[country_id] = (
self.env['hr.payroll.structure.type'].search([('country_id', '=', country_id)], limit=1)
or self.env['hr.payroll.structure.type'].search([('country_id', '=', False)], limit=1)
)
return default_structure
for contract in self:
if not contract.structure_type_id or contract.structure_type_id.country_id != contract.company_id.country_id:
contract.structure_type_id = _default_salary_structure(contract.company_id.country_id.id)
@api.onchange('structure_type_id')
def _onchange_structure_type_id(self):
default_calendar = self.structure_type_id.default_resource_calendar_id
if default_calendar and default_calendar.company_id == self.company_id:
self.resource_calendar_id = default_calendar
@api.constrains('employee_id', 'state', 'kanban_state', 'date_start', 'date_end')
def _check_current_contract(self):
""" Two contracts in state [incoming | open | close] cannot overlap """
for contract in self.filtered(lambda c: (c.state not in ['draft', 'cancel'] or c.state == 'draft' and c.kanban_state == 'done') and c.employee_id):
domain = [
('id', '!=', contract.id),
('employee_id', '=', contract.employee_id.id),
('company_id', '=', contract.company_id.id),
'|',
('state', 'in', ['open', 'close']),
'&',
('state', '=', 'draft'),
('kanban_state', '=', 'done') # replaces incoming
]
if not contract.date_end:
start_domain = []
end_domain = ['|', ('date_end', '>=', contract.date_start), ('date_end', '=', False)]
else:
start_domain = [('date_start', '<=', contract.date_end)]
end_domain = ['|', ('date_end', '>', contract.date_start), ('date_end', '=', False)]
domain = expression.AND([domain, start_domain, end_domain])
if self.search_count(domain):
raise ValidationError(
_(
'An employee can only have one contract at the same time. (Excluding Draft and Cancelled contracts).\n\nEmployee: %(employee_name)s',
employee_name=contract.employee_id.name
)
)
@api.constrains('date_start', 'date_end')
def _check_dates(self):
for contract in self:
if contract.date_end and contract.date_start > contract.date_end:
raise ValidationError(_(
'Contract %(contract)s: start date (%(start)s) must be earlier than contract end date (%(end)s).',
contract=contract.name, start=contract.date_start, end=contract.date_end,
))
@api.model
def update_state(self):
from_cron = 'from_cron' in self.env.context
companies = self.env['res.company'].search([])
contracts = self.env['hr.contract']
work_permit_contracts = self.env['hr.contract']
for company in companies:
contracts += self.search([
('state', '=', 'open'), ('kanban_state', '!=', 'blocked'), ('company_id', '=', company.id),
'&',
('date_end', '<=', fields.date.today() + relativedelta(days=company.contract_expiration_notice_period)),
('date_end', '>=', fields.date.today() + relativedelta(days=1)),
])
work_permit_contracts += self.search([
('state', '=', 'open'), ('kanban_state', '!=', 'blocked'), ('company_id', '=', company.id),
'&',
('employee_id.work_permit_expiration_date', '<=', fields.date.today() + relativedelta(days=company.work_permit_expiration_notice_period)),
('employee_id.work_permit_expiration_date', '>=', fields.date.today() + relativedelta(days=1)),
])
for contract in contracts:
contract.with_context(mail_activity_quick_update=True).activity_schedule(
'mail.mail_activity_data_todo', contract.date_end,
_("The contract of %s is about to expire.", contract.employee_id.name),
user_id=contract.hr_responsible_id.id or self.env.uid)
contract.message_post(
body=_(
"According to the contract's end date, this contract has been put in red on the %s. Please advise and correct.",
fields.Date.today()
)
)
for contract in work_permit_contracts:
contract.with_context(mail_activity_quick_update=True).activity_schedule(
'mail.mail_activity_data_todo', contract.date_end,
_("The work permit of %s is about to expire.", contract.employee_id.name),
user_id=contract.hr_responsible_id.id or self.env.uid)
contract.message_post(
body=_(
"According to Employee's Working Permit Expiration Date, this contract has been put in red on the %s. Please advise and correct.",
fields.Date.today()
)
)
if contracts:
contracts._safe_write_for_cron({'kanban_state': 'blocked'}, from_cron)
if work_permit_contracts:
work_permit_contracts._safe_write_for_cron({'kanban_state': 'blocked'}, from_cron)
contracts_to_close = self.search([
('state', '=', 'open'),
'|',
('date_end', '<=', fields.Date.to_string(date.today())),
('employee_id.work_permit_expiration_date', '<=', fields.Date.to_string(date.today())),
])
if contracts_to_close:
contracts_to_close._safe_write_for_cron({'state': 'close'}, from_cron)
contracts_to_open = self.search([('state', '=', 'draft'), ('kanban_state', '=', 'done'), ('date_start', '<=', fields.Date.to_string(date.today())),])
if contracts_to_open:
contracts_to_open._safe_write_for_cron({'state': 'open'}, from_cron)
contract_ids = self.search([('date_end', '=', False), ('state', '=', 'close'), ('employee_id', '!=', False)])
# Ensure all closed contract followed by a new contract have a end date.
# If closed contract has no closed date, the work entries will be generated for an unlimited period.
for contract in contract_ids:
next_contract = self.search([
('employee_id', '=', contract.employee_id.id),
('state', 'not in', ['cancel', 'draft']),
('date_start', '>', contract.date_start)
], order="date_start asc", limit=1)
if next_contract:
contract._safe_write_for_cron({'date_end': next_contract.date_start - relativedelta(days=1)}, from_cron)
continue
next_contract = self.search([
('employee_id', '=', contract.employee_id.id),
('date_start', '>', contract.date_start)
], order="date_start asc", limit=1)
if next_contract:
contract._safe_write_for_cron({'date_end': next_contract.date_start - relativedelta(days=1)}, from_cron)
return True
def _safe_write_for_cron(self, vals, from_cron=False):
if from_cron:
auto_commit = not getattr(threading.current_thread(), 'testing', False)
for contract in self:
try:
with self.env.cr.savepoint():
contract.write(vals)
except ValidationError as e:
_logger.warning(e)
else:
if auto_commit:
self.env.cr.commit()
else:
self.write(vals)
def _assign_open_contract(self):
for contract in self:
contract.employee_id.sudo().write({'contract_id': contract.id})
@api.depends('wage')
def _compute_contract_wage(self):
for contract in self:
contract.contract_wage = contract._get_contract_wage()
def _get_contract_wage(self):
if not self:
return 0
self.ensure_one()
return self[self._get_contract_wage_field()]
def _get_contract_wage_field(self):
self.ensure_one()
return 'wage'
def write(self, vals):
old_state = {c.id: c.state for c in self}
res = super(Contract, self).write(vals)
new_state = {c.id: c.state for c in self}
if vals.get('state') == 'open':
self._assign_open_contract()
today = fields.Date.today()
for contract in self:
if contract == contract.sudo().employee_id.contract_id \
and old_state[contract.id] == 'open' \
and new_state[contract.id] != 'open':
running_contract = self.env['hr.contract'].search([
('employee_id', '=', contract.employee_id.id),
('company_id', '=', contract.company_id.id),
('state', '=', 'open'),
]).filtered(lambda c: c.date_start <= today and (not c.date_end or c.date_end >= today))
if running_contract:
contract.employee_id.sudo().contract_id = running_contract[0]
if vals.get('state') == 'close':
for contract in self.filtered(lambda c: not c.date_end):
contract.date_end = max(date.today(), contract.date_start)
date_end = vals.get('date_end')
if self.env.context.get('close_contract', True) and date_end and fields.Date.from_string(date_end) < fields.Date.context_today(self):
for contract in self.filtered(lambda c: c.state == 'open'):
contract.state = 'close'
calendar = vals.get('resource_calendar_id')
if calendar:
self.filtered(
lambda c: c.state == 'open' or (c.state == 'draft' and c.kanban_state == 'done' and c.employee_id.contracts_count == 1)
).mapped('employee_id').filtered(
lambda e: e.resource_calendar_id
).write({'resource_calendar_id': calendar})
if 'state' in vals and 'kanban_state' not in vals:
self.write({'kanban_state': 'normal'})
return res
@api.model_create_multi
def create(self, vals_list):
contracts = super().create(vals_list)
contracts.filtered(lambda c: c.state == 'open')._assign_open_contract()
open_contracts = contracts.filtered(
lambda c: c.state == 'open' or (c.state == 'draft' and c.kanban_state == 'done' and c.employee_id.contracts_count == 1)
)
# sync contract calendar -> calendar employee
for contract in open_contracts.filtered(lambda c: c.employee_id and c.resource_calendar_id):
contract.employee_id.resource_calendar_id = contract.resource_calendar_id
return contracts
def _track_subtype(self, init_values):
self.ensure_one()
if 'state' in init_values and self.state == 'open' and 'kanban_state' in init_values and self.kanban_state == 'blocked':
return self.env.ref('hr_contract.mt_contract_pending')
elif 'state' in init_values and self.state == 'close':
return self.env.ref('hr_contract.mt_contract_close')
return super(Contract, self)._track_subtype(init_values)
def _is_struct_from_country(self, country_code):
self.ensure_one()
self_sudo = self.sudo()
return self_sudo.structure_type_id and self_sudo.structure_type_id.country_id.code == country_code
def action_open_contract_form(self):
self.ensure_one()
action = self.env['ir.actions.actions']._for_xml_id('hr_contract.action_hr_contract')
action.update({
'view_mode': 'form',
'view_id': self.env.ref('hr_contract.hr_contract_view_form').id,
'views': [(self.env.ref('hr_contract.hr_contract_view_form').id, 'form')],
'res_id': self.id,
})
return action
def action_open_contract_history(self):
self.ensure_one()
action = self.env["ir.actions.actions"]._for_xml_id('hr_contract.hr_contract_history_view_form_action')
action['res_id'] = self.employee_id.id
return action
def action_open_contract_list(self):
self.ensure_one()
action = self.env["ir.actions.actions"]._for_xml_id('hr_contract.action_hr_contract')
action.update({'domain': [('employee_id', '=', self.employee_id.id)],
'views': [[False, 'list'], [False, 'kanban'], [False, 'activity'], [False, 'form']],
'context': {'default_employee_id': self.employee_id.id}})
return action