hr_work_entry_contract/models/hr_work_intervals.py

98 lines
3.5 KiB
Python
Raw Permalink Normal View History

# -*- coding:utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
from itertools import chain
def _boundaries(intervals, opening, closing):
""" Iterate on the boundaries of intervals. """
for start, stop, recs in intervals:
if start < stop:
yield (start, opening, recs)
yield (stop, closing, recs)
class WorkIntervals(object):
"""
This class is a modified copy of the ``Intervals`` class in the resource module.
A generic solution to handle intervals should probably be developped the day a similar
class is needed elsewhere.
This implementation differs from the resource implementation in its management
of two continuous intervals. Here, continuous intervals are not merged together
while they are merged in resource.
e.g.:
In resource: (1, 4, rec1) and (4, 10, rec2) are merged into (1, 10, rec1 | rec2)
Here: they remain two different intervals.
To implement this behaviour, the main implementation change is the way boundaries are sorted.
"""
def __init__(self, intervals=()):
self._items = []
if intervals:
# normalize the representation of intervals
append = self._items.append
starts = []
recses = []
for value, flag, recs in sorted(_boundaries(sorted(intervals), 'start', 'stop'), key=lambda i: i[0]):
if flag == 'start':
starts.append(value)
recses.append(recs)
else:
start = starts.pop()
if not starts:
append((start, value, recses[0].union(*recses)))
recses.clear()
def __bool__(self):
return bool(self._items)
def __len__(self):
return len(self._items)
def __iter__(self):
return iter(self._items)
def __reversed__(self):
return reversed(self._items)
def __or__(self, other):
""" Return the union of two sets of intervals. """
return WorkIntervals(chain(self._items, other._items))
def __and__(self, other):
""" Return the intersection of two sets of intervals. """
return self._merge(other, False)
def __sub__(self, other):
""" Return the difference of two sets of intervals. """
return self._merge(other, True)
def _merge(self, other, difference):
""" Return the difference or intersection of two sets of intervals. """
result = WorkIntervals()
append = result._items.append
# using 'self' and 'other' below forces normalization
bounds1 = _boundaries(sorted(self), 'start', 'stop')
bounds2 = _boundaries(sorted(other), 'switch', 'switch')
start = None # set by start/stop
recs1 = None # set by start
enabled = difference # changed by switch
for value, flag, recs in sorted(chain(bounds1, bounds2), key=lambda i: i[0]):
if flag == 'start':
start = value
recs1 = recs
elif flag == 'stop':
if enabled and start < value:
append((start, value, recs1))
start = None
else:
if not enabled and start is not None:
start = value
if enabled and start is not None and start < value:
append((start, value, recs1))
enabled = not enabled
return result