From 6eba0435826dca0415390f6c36652d647841e0f8 Mon Sep 17 00:00:00 2001 From: Kenneth Reitz Date: Tue, 23 May 2017 10:49:36 -0700 Subject: [PATCH] amazing MayaInterval class, compliments of @dgouldin :) --- maya.py | 291 ++++++++++++++++++++++- setup.py | 2 +- t.py | 7 + test_maya.py | 19 ++ test_maya_interval.py | 538 ++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 849 insertions(+), 8 deletions(-) create mode 100644 t.py create mode 100755 test_maya_interval.py diff --git a/maya.py b/maya.py index b98c989..d250587 100644 --- a/maya.py +++ b/maya.py @@ -19,6 +19,7 @@ import dateparser import pendulum from tzlocal import get_localzone + _EPOCH_START = (1970, 1, 1) @@ -41,7 +42,6 @@ def validate_class_type_arguments(operator): return inner - class MayaDT(object): """The Maya Datetime object.""" @@ -59,7 +59,6 @@ class MayaDT(object): """Return's the datetime's format""" return format(self.datetime(), *args, **kwargs) - @validate_class_type_arguments('==') def __eq__(self, maya_dt): return self._epoch == maya_dt._epoch @@ -84,6 +83,19 @@ class MayaDT(object): def __ge__(self, maya_dt): return self._epoch >= maya_dt._epoch + def __hash__(self): + return hash(self.epoch) + + def __add__(self, item): + return self.add(seconds=seconds_or_timedelta(item).total_seconds()) + + def __radd__(self, item): + return self + item + + def __sub__(self, item): + return self.subtract( + seconds=seconds_or_timedelta(item).total_seconds()) + def add(self, **kwargs): """"Returns a new MayaDT object with the given offsets.""" return self.from_datetime(pendulum.instance(self.datetime()).add(**kwargs)) @@ -187,7 +199,7 @@ class MayaDT(object): def rfc3339(self): """Returns an RFC 3339 representation of the MayaDT.""" - return self.datetime().strftime("%Y-%m-%dT%H:%M:%S.%f")[:-4]+"Z" + return self.datetime().strftime("%Y-%m-%dT%H:%M:%S.%f")[:-4] + "Z" # Properties # ---------- @@ -247,6 +259,262 @@ class MayaDT(object): return humanize.naturaltime(dt) +def to_utc_offset_naive(dt): + if dt.tzinfo is None: + return dt + return dt.astimezone(pytz.utc).replace(tzinfo=None) + + +def to_utc_offset_aware(dt): + if dt.tzinfo is not None: + return dt + return pytz.utc.localize(dt) + + +def to_iso8601(dt): + return to_utc_offset_naive(dt).isoformat() + 'Z' + + +def end_of_day_midnight(dt): + return dt if dt.time() == time.min else\ + (dt.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1)) + + +class MayaInterval(object): + """ + A MayaInterval represents a range between two datetimes, inclusive of the start + and exclusive of the end. + """ + def __init__(self, start=None, end=None, duration=None): + try: + # Ensure that proper arguments were passed. + assert any(( + (start and end), + (start and duration is not None), + (end and duration is not None), + )) + assert not all((start, end, duration is not None)) + except AssertionError: + raise ValueError( + 'Exactly 2 of start, end, and duration must be specified') + + # Convert duration to timedelta if seconds were provided. + duration = seconds_or_timedelta(duration) + + if not start: + start = end - duration + + if not end: + end = start + duration + + if start > end: + raise ValueError('MayaInterval cannot end before it starts') + + self.start = start + self.end = end + + def __repr__(self): + return ''.format(self.start, self.end) + + def iso8601(self): + """Returns an ISO 8601 representation of the MayaInterval.""" + return '{0}/{1}'.format(self.start.iso6801, self.end.iso8601) + + @classmethod + def from_iso8601(cls, s): + # # Start and end, such as "2007-03-01T13:00:00Z/2008-05-11T15:30:00Z" + # start, end = s.split('/') + # try: + # start = parse(start) + # except pendulum.parsing.exceptions.ParserError: + # start = self._parse_iso8601_duration(start) + # try: + # end = parse(start) + # except pendulum.parsing.exceptions.ParserError as e: + # end = self._parse_iso8601_duration(start) + + # # Start and duration, such as "2007-03-01T13:00:00Z/P1Y2M10DT2H30M" + # # Duration and end, such as "P1Y2M10DT2H30M/2008-05-11T15:30:00Z" + raise NotImplentedError() + + + def __and__(self, i): + return self.intersection(i) + + def __or__(self, i): + return self.combine(i) + + def __eq__(self, i): + return ( + self.start == i.start and + self.end == i.end + ) + + def __hash__(self): + return hash((self.start, self.end)) + + def __iter__(self): + yield self.start + yield self.end + + def __cmp__(self, i): + return ( + cmp(self.start, i.start) or + cmp(self.end, i.end) + ) + + @property + def duration(self): + return self.timedelta.total_seconds() + + @property + def timedelta(self): + return timedelta(seconds=(self.end.epoch - self.start.epoch)) + + @property + def is_instant(self): + return self.timedelta == timedelta(seconds=0) + + def intersects(self, i): + return self & i is not None + + @property + def midpoint(self): + return self.start.add(seconds=(self.duration / 2)) + + def combine(self, i): + """Returns a combined list of timespans, merged together.""" + ii = sorted([self, i]) + if self & i or self.is_adjacent(i): + return [ + MayaInterval( + ii[0].start, + max(ii[0].end, ii[1].end), + ), + ] + return ii + + def subtract(self, i): + """"Removes the given inerval.""" + if not self & i: + return [self] + elif i.contains(self): + return [] + + ii = [] + if self.start < i.start: + ii.append(MayaInterval(self.start, i.start)) + if self.end > i.end: + ii.append(MayaInterval(i.end, self.end)) + return ii + + def split(self, duration, include_remainder=True): + + # Convert seconds to timedelta, if appropriate. + duration = seconds_or_timedelta(duration) + + assert duration > timedelta(seconds=0), 'cannot call split with a non-positive timedelta' + start = self.start + while start < self.end: + if start + duration <= self.end: + yield MayaInterval(start, start + duration) + elif include_remainder: + yield MayaInterval(start, self.end) + start += duration + + def quantize(self, duration, snap_out=False, timezone='UTC'): + """Returns a quanitzed interval.""" + + # Convert seconds to timedelta, if appropriate. + duration = seconds_or_timedelta(duration) + timezone = pytz.timezone(timezone) + + assert duration > timedelta(seconds=0), 'cannot quantize by non-positive timedelta' + epoch = timezone.localize(Datetime(1970, 1, 1)) + seconds = int(duration.total_seconds()) + + start_seconds = int((self.start.datetime(naive=False) - epoch).total_seconds()) + end_seconds = int((self.end.datetime(naive=False) - epoch).total_seconds()) + + if start_seconds % seconds and not snap_out: + start_seconds += seconds + if end_seconds % seconds and snap_out: + end_seconds += seconds + + start_seconds -= start_seconds % seconds + end_seconds -= end_seconds % seconds + + if start_seconds > end_seconds: + start_seconds = end_seconds + + return MayaInterval( + start=MayaDT.from_datetime(epoch).add(seconds=start_seconds), + end=MayaDT.from_datetime(epoch).add(seconds=end_seconds), + ) + + def intersection(self, i): + """Returns the intersection between two intervals.""" + + start = max(self.start, i.start) + end = min(self.end, i.end) + + either_instant = self.is_instant or i.is_instant + instant_overlap = ( + self.start == i.start or + start <= end + ) + if ((either_instant and instant_overlap) or (start < end)): + return MayaInterval(start, end) + + def contains(self, i): + return ( + self.start <= i.start and + self.end >= i.end + ) + + def __contains__(self, item): + if isinstance(item, MayaDT): + return self.contains_dt(item) + + return item.contains(self) + + def contains_dt(self, dt): + return self.start <= dt < self.end + + def is_adjacent(self, i): + return ( + self.start == i.end or + self.end == i.start + ) + + @property + def icalendar(self): + ical_dt_format = '%Y%m%dT%H%M%SZ' + return """ + BEGIN:VCALENDAR + VERSION:2.0 + BEGIN:VEVENT + DTSTART:{0} + DTEND:{1} + END:VEVENT + END:VCALENDAR + """.format( + self.start.datetime().strftime(ical_dt_format), + self.end.datetime().strftime(ical_dt_format), + ).replace(' ', '').strip('\r\n').replace('\n', '\r\n') + + @staticmethod + def flatten(ii): + return reduce(lambda reduced, i: ( + (reduced[:-1] + i.combine(reduced[-1])) + if reduced else [i] + ), sorted(ii), []) + + @classmethod + def from_datetime(cls, start_dt=None, end_dt=None, duration=None): + start = MayaDT.from_datetime(start_dt) if start_dt else None + end = MayaDT.from_datetime(end_dt) if end_dt else None + return cls(start=start, end=end, duration=duration) def now(): @@ -254,6 +522,7 @@ def now(): epoch = time.time() return MayaDT(epoch=epoch) + def when(string, timezone='UTC'): """"Returns a MayaDT instance for the human moment specified. @@ -274,6 +543,7 @@ def when(string, timezone='UTC'): return MayaDT.from_datetime(dt) + def parse(string, day_first=False): """"Returns a MayaDT instance for the machine-produced moment specified. @@ -286,15 +556,22 @@ def parse(string, day_first=False): dt = pendulum.parse(string, day_first=day_first) return MayaDT.from_datetime(dt) + +def seconds_or_timedelta(s): + + # Convert seconds into timedelta. + if isinstance(s, int): + s = timedelta(seconds=s) + + return s + + def intervals(start, end, interval): """Yields MayaDT objects between the start and end MayaDTs given, at a given interval (seconds or timedelta).""" - # Convert seconds into timedelta. - if isinstance(interval, int): - interval = timedelta(seconds=interval) + interval = seconds_or_timedelta(interval) current_timestamp = start while current_timestamp.epoch < end.epoch: yield current_timestamp current_timestamp = current_timestamp.add(seconds=interval.seconds) - diff --git a/setup.py b/setup.py index 7929655..952e6fe 100755 --- a/setup.py +++ b/setup.py @@ -35,7 +35,7 @@ required = [ setup( name='maya', - version='0.1.9', + version='0.3.0', description='Datetimes for Humans.', long_description=long_description, author='Kenneth Reitz', diff --git a/t.py b/t.py new file mode 100644 index 0000000..ba4ac5d --- /dev/null +++ b/t.py @@ -0,0 +1,7 @@ +import maya + +a1 = maya.now() +a2 = a1.add(hours=1) + +i = maya.MayaInterval(a1, a2) +print i \ No newline at end of file diff --git a/test_maya.py b/test_maya.py index 9f6c296..3030a28 100644 --- a/test_maya.py +++ b/test_maya.py @@ -1,5 +1,6 @@ import pytest import copy +from datetime import timedelta import maya @@ -208,3 +209,21 @@ def test_intervals(): tomorrow = now.add(days=1) assert len(list(maya.intervals(now, tomorrow, 60*60))) == 24 + + +def test_dunder_add(): + now = maya.now() + assert now + 1 == now.add(seconds=1) + assert now + timedelta(seconds=1) == now.add(seconds=1) + + +def test_dunder_radd(): + now = maya.now() + assert now.add(seconds=1) == now + 1 + assert now.add(seconds=1) == now + timedelta(seconds=1) + + +def test_dunder_sub(): + now = maya.now() + assert now - 1 == now.subtract(seconds=1) + assert now - timedelta(seconds=1) == now.subtract(seconds=1) diff --git a/test_maya_interval.py b/test_maya_interval.py new file mode 100755 index 0000000..0cd1623 --- /dev/null +++ b/test_maya_interval.py @@ -0,0 +1,538 @@ +import random +from datetime import datetime, timedelta + +import pytest +import pytz + +import maya + + +Los_Angeles = pytz.timezone('America/Los_Angeles') +New_York = pytz.timezone('America/New_York') +Melbourne = pytz.timezone('Australia/Melbourne') + + +def test_interval_requires_2_of_start_end_duration(): + start = maya.now() + end = start.add(hours=1) + + with pytest.raises(ValueError): + maya.MayaInterval(start=start) + + with pytest.raises(ValueError): + maya.MayaInterval(end=end) + + with pytest.raises(ValueError): + maya.MayaInterval(duration=60) + + with pytest.raises(ValueError): + maya.MayaInterval(start=start, end=end, duration=60) + + maya.MayaInterval(start=start, end=end) + maya.MayaInterval(start=start, duration=60) + maya.MayaInterval(end=end, duration=60) + + +def test_interval_requires_end_time_after_or_on_start_time(): + maya.MayaInterval(start=maya.now(), duration=0) + + with pytest.raises(ValueError): + maya.MayaInterval(start=maya.now(), duration=-1) + + +def test_interval_init_start_end(): + start = maya.now() + end = start.add(hours=1) + interval = maya.MayaInterval(start=start, end=end) + assert interval.start == start + assert interval.end == end + + +def test_interval_init_start_duration(): + start = maya.now() + duration = 1 + interval = maya.MayaInterval(start=start, duration=duration) + assert interval.start == start + assert interval.end == start.add(seconds=duration) + + +def test_interval_init_end_duration(): + end = maya.now() + duration = 1 + interval = maya.MayaInterval(end=end, duration=duration) + assert interval.end == end + assert interval.start == end.subtract(seconds=duration) + + +@pytest.mark.parametrize('start_doy1,end_doy1,start_doy2,end_doy2,intersection_doys', ( + (0, 2, 1, 3, (1, 2)), + (0, 2, 3, 4, None), + (0, 2, 2, 3, None), + (0, 1, 0, 1, (0, 1)), + (1, 1, 1, 3, (1, 1)), + (1, 1, 1, 1, (1, 1)), + (1, 1, 2, 3, None), + (2, 2, 1, 3, (2, 2)), + (1, 3, 1, 1, (1, 1)), + (2, 3, 1, 1, None), + (1, 3, 2, 2, (2, 2)), +), ids=( + 'overlapping', + 'non-overlapping', + 'adjacent', + 'equal', + 'instant overlapping start only', + 'instant equal', + 'instant disjoint', + 'instant overlapping', + 'instant overlapping start only (left)', + 'instant disjoint (left)', + 'instant overlapping (left)' +)) +def test_interval_intersection( + start_doy1, end_doy1, start_doy2, end_doy2, intersection_doys +): + base = maya.MayaDT.from_datetime(datetime(2016, 1, 1)) + interval1 = maya.MayaInterval( + base.add(days=start_doy1), + base.add(days=end_doy1), + ) + interval2 = maya.MayaInterval( + base.add(days=start_doy2), + base.add(days=end_doy2), + ) + + if intersection_doys: + start_doy_intersection, end_doy_intersection = intersection_doys + assert interval1 & interval2 == maya.MayaInterval( + base.add(days=start_doy_intersection), + base.add(days=end_doy_intersection), + ) + else: + assert (interval1 & interval2) is None + + +def test_interval_intersects(): + base = maya.MayaDT.from_datetime(datetime(2016, 1, 1)) + interval = maya.MayaInterval(base, base.add(days=1)) + + assert interval.intersects(interval) + assert not interval.intersects(maya.MayaInterval( + base.add(days=2), + base.add(days=3), + )) + + +def test_and_operator(): + base = maya.MayaDT.from_datetime(datetime(2016, 1, 1)) + interval1 = maya.MayaInterval(base, base.add(days=2)) + interval2 = maya.MayaInterval(base.add(days=1), base.add(days=3)) + + assert ( + interval1 & interval2 == + interval2 & interval1 == + interval1.intersection(interval2) + ) + + +def test_interval_eq_operator(): + start = maya.now() + end = start.add(hours=1) + interval = maya.MayaInterval(start=start, end=end) + + assert interval == maya.MayaInterval(start=start, end=end) + assert interval != maya.MayaInterval(start=start, end=end.add(days=1)) + + +def test_interval_timedelta(): + start = maya.now() + delta = timedelta(hours=1) + interval = maya.MayaInterval(start=start, duration=delta) + + assert interval.timedelta == delta + + +def test_interval_duration(): + start = maya.now() + delta = timedelta(hours=1) + interval = maya.MayaInterval(start=start, duration=delta) + + assert interval.duration == delta.total_seconds() + + +@pytest.mark.parametrize('start_doy1,end_doy1,start_doy2,end_doy2,expected', ( + (0, 2, 1, 3, False), + (0, 2, 3, 4, False), + (0, 2, 2, 3, False), + (0, 1, 0, 1, True), + (0, 3, 1, 2, True), +), ids=( + 'overlapping', + 'non-overlapping', + 'adjacent', + 'equal', + 'subset', +)) +def test_interval_contains( + start_doy1, end_doy1, start_doy2, end_doy2, expected +): + base = maya.MayaDT.from_datetime(datetime(2016, 1, 1)) + interval1 = maya.MayaInterval( + base.add(days=start_doy1), + base.add(days=end_doy1), + ) + interval2 = maya.MayaInterval( + base.add(days=start_doy2), + base.add(days=end_doy2), + ) + + assert interval1.contains(interval2) is expected + assert (interval1 in interval2) is expected + + +@pytest.mark.parametrize('start_doy,end_doy,dt_doy,expected', ( + (2, 4, 1, False), + (2, 4, 2, True), + (2, 4, 3, True), + (2, 4, 4, False), + (2, 4, 5, False), +), ids=( + 'before-start', + 'on-start', + 'during', + 'on-end', + 'after-end', +)) +def test_interval_in_operator_maya_dt( + start_doy, end_doy, dt_doy, expected +): + base = maya.MayaDT.from_datetime(datetime(2016, 1, 1)) + interval = maya.MayaInterval( + start=base.add(days=start_doy), + end=base.add(days=end_doy), + ) + dt = base.add(days=dt_doy) + + assert (dt in interval) is expected + + +def test_interval_hash(): + start = maya.now() + end = start.add(hours=1) + interval = maya.MayaInterval(start=start, end=end) + + assert hash(interval) == hash(maya.MayaInterval(start=start, end=end)) + assert hash(interval) != hash(maya.MayaInterval( + start=start, end=end.add(days=1))) + + +def test_interval_iter(): + start = maya.now() + end = start.add(days=1) + + assert tuple(maya.MayaInterval(start=start, end=end)) == (start, end) + + +@pytest.mark.parametrize('start1,end1,start2,end2,expected', [ + (1, 2, 1, 2, 0), + (1, 3, 2, 4, -1), + (2, 4, 1, 3, 1), + (1, 2, 1, 3, -1), +], ids=( + 'equal', + 'less-than', + 'greater-than', + 'use-end-time-if-start-time-identical', +)) +def test_interval_cmp(start1, end1, start2, end2, expected): + base = maya.now() + interval1 = maya.MayaInterval( + start=base.add(days=start1), + end=base.add(days=end1), + ) + interval2 = maya.MayaInterval( + start=base.add(days=start2), + end=base.add(days=end2), + ) + assert cmp(interval1, interval2) == expected + + +@pytest.mark.parametrize('start1,end1,start2,end2,expected', [ + (1, 2, 2, 3, [(1, 3)]), + (1, 3, 2, 4, [(1, 4)]), + (1, 2, 3, 4, [(1, 2), (3, 4)]), + (1, 5, 2, 3, [(1, 5)]), +], ids=( + 'adjacent', + 'overlapping', + 'non-overlapping', + 'contains', +)) +def test_interval_combine(start1, end1, start2, end2, expected): + base = maya.now() + interval1 = maya.MayaInterval( + start=base.add(days=start1), + end=base.add(days=end1), + ) + interval2 = maya.MayaInterval( + start=base.add(days=start2), + end=base.add(days=end2), + ) + expected_intervals = [maya.MayaInterval( + start=base.add(days=start), + end=base.add(days=end), + ) for start, end in expected] + + assert interval1.combine(interval2) == expected_intervals + assert interval2.combine(interval1) == expected_intervals + + +@pytest.mark.parametrize('start1,end1,start2,end2,expected', [ + (1, 2, 3, 4, [(1, 2)]), + (1, 2, 2, 4, [(1, 2)]), + (2, 3, 1, 4, []), + (1, 4, 2, 3, [(1, 2), (3, 4)]), + (1, 4, 0, 2, [(2, 4)]), + (1, 4, 3, 5, [(1, 3)]), + (1, 4, 1, 2, [(2, 4)]), + (1, 4, 3, 4, [(1, 3)]), +], ids=( + 'non-overlapping', + 'adjacent', + 'contains', + 'splits', + 'overlaps-left', + 'overlaps-right', + 'overlaps-left-identical-start', + 'overlaps-right-identical-end', +)) +def test_interval_subtract(start1, end1, start2, end2, expected): + base = maya.now() + interval1 = maya.MayaInterval( + start=base.add(days=start1), + end=base.add(days=end1), + ) + interval2 = maya.MayaInterval( + start=base.add(days=start2), + end=base.add(days=end2), + ) + expected_intervals = [maya.MayaInterval( + start=base.add(days=start), + end=base.add(days=end), + ) for start, end in expected] + + assert interval1.subtract(interval2) == expected_intervals + + +@pytest.mark.parametrize('start1,end1,start2,end2,expected', [ + (1, 2, 2, 3, True), + (2, 3, 1, 2, True), + (1, 3, 2, 3, False), + (2, 3, 4, 5, False), +], ids=( + 'adjacent-right', + 'adjacent-left', + 'overlapping', + 'non-overlapping', +)) +def test_interval_is_adjacent(start1, end1, start2, end2, expected): + base = maya.now() + interval1 = maya.MayaInterval( + start=base.add(days=start1), + end=base.add(days=end1), + ) + interval2 = maya.MayaInterval( + start=base.add(days=start2), + end=base.add(days=end2), + ) + assert interval1.is_adjacent(interval2) == expected + + +@pytest.mark.parametrize('start,end,delta,include_remainder,expected', [ + (0, 10, 5, False, [(0, 5), (5, 10)]), + (0, 10, 5, True, [(0, 5), (5, 10)]), + (0, 10, 3, False, [(0, 3), (3, 6), (6, 9)]), + (0, 10, 3, True, [(0, 3), (3, 6), (6, 9), (9, 10)]), + (0, 2, 5, False, []), + (0, 2, 5, True, [(0, 2)]), +], ids=( + 'even-split', + 'even-split-include-partial', + 'uneven-split-do-not-include-partial', + 'uneven-split-include-partial', + 'delta-larger-than-timepsan-do-not-include-partial', + 'delta-larger-than-timepsan-include-partial', +)) +def test_interval_split(start, end, delta, include_remainder, expected): + base = maya.now() + interval = maya.MayaInterval( + start=base.add(days=start), + end=base.add(days=end), + ) + delta = timedelta(days=delta) + + expected_intervals = [ + maya.MayaInterval( + start=base.add(days=s), + end=base.add(days=e), + ) for s, e in expected + ] + + assert expected_intervals == list(interval.split( + delta, include_remainder=include_remainder)) + + +def test_interval_split_non_positive_delta(): + start = maya.now() + end = start.add(days=1) + interval = maya.MayaInterval(start=start, end=end) + + with pytest.raises(AssertionError): + list(interval.split(timedelta(seconds=0))) + + with pytest.raises(AssertionError): + list(interval.split(timedelta(seconds=-10))) + + +@pytest.mark.parametrize('start,end,minutes,timezone,snap_out,expected_start,expected_end', [ + ((5, 12), (8, 48), 30, None, False, (5, 30), (8, 30)), + ((5, 12), (8, 48), 30, None, True, (5, 0), (9, 0)), + ((5, 15), (9, 0), 15, None, False, (5, 15), (9, 0)), + ((5, 15), (9, 0), 15, None, True, (5, 15), (9, 0)), + ((6, 50), (9, 15), 60, 'America/New_York', False, (7, 0), (9, 0)), + ((6, 50), (9, 15), 60, 'America/New_York', True, (6, 0), (10, 0)), + ((6, 20), (6, 50), 60, None, False, (6, 0), (6, 0)), + ((6, 20), (6, 50), 60, None, True, (6, 0), (7, 0)), + ((6, 20), (6, 50), 60, 'America/Chicago', False, (6, 0), (6, 0)), + ((6, 20), (6, 50), 60, 'America/Chicago', True, (6, 0), (7, 0)), +], ids=( + 'normal', + 'normal-snap_out', + 'already-quantized', + 'already-quantized-snap_out', + 'with-timezone', + 'with-timezone-snap_out', + 'too-small', + 'too-small-snap_out', + 'too-small-with-timezone', + 'too-small-with-timezone-snap_out', +)) +def test_quantize(start, end, minutes, timezone, snap_out, expected_start, expected_end): + base = maya.MayaDT.from_datetime(datetime(2017, 1, 1)) + interval = maya.MayaInterval( + start=base.add(hours=start[0], minutes=start[1]), + end=base.add(hours=end[0], minutes=end[1]), + ) + + kwargs = {'timezone': timezone} if timezone is not None else {} + quantized_interval = interval.quantize( + timedelta(minutes=minutes), + snap_out=snap_out, + **kwargs + ) + + assert quantized_interval == maya.MayaInterval( + start=base.add(hours=expected_start[0], minutes=expected_start[1]), + end=base.add(hours=expected_end[0], minutes=expected_end[1]), + ) + + +def test_quantize_invalid_delta(): + start = maya.now() + end = start.add(days=1) + interval = maya.MayaInterval(start=start, end=end) + + with pytest.raises(AssertionError): + interval.quantize(timedelta(minutes=0)) + with pytest.raises(AssertionError): + interval.quantize(timedelta(minutes=-1)) + + +def test_interval_flatten_non_overlapping(): + step = 2 + max_hour = 20 + base = maya.now() + intervals = [maya.MayaInterval( + start=base.add(hours=hour), + duration=timedelta(hours=step - 1), + ) for hour in range(0, max_hour, step)] + random.shuffle(intervals) + + assert maya.MayaInterval.flatten(intervals) == sorted(intervals) + + +def test_interval_flatten_adjacent(): + step = 2 + max_hour = 20 + base = maya.now() + intervals = [maya.MayaInterval( + start=base.add(hours=hour), + duration=timedelta(hours=step), + ) for hour in range(0, max_hour, step)] + random.shuffle(intervals) + + assert maya.MayaInterval.flatten(intervals) == [maya.MayaInterval( + start=base, + duration=timedelta(hours=max_hour), + )] + + +def test_interval_flatten_intersecting(): + step = 2 + max_hour = 20 + base = maya.now() + intervals = [maya.MayaInterval( + start=base.add(hours=hour), + duration=timedelta(hours=step, minutes=30), + ) for hour in range(0, max_hour, step)] + random.shuffle(intervals) + + assert maya.MayaInterval.flatten(intervals) == [maya.MayaInterval( + start=base, + duration=timedelta(hours=max_hour, minutes=30), + )] + + +def test_interval_flatten_containing(): + step = 2 + max_hour = 20 + base = maya.now() + containing_interval = maya.MayaInterval( + start=base, + end=base.add(hours=max_hour + step), + ) + intervals = [maya.MayaInterval( + start=base.add(hours=hour), + duration=timedelta(hours=step - 1), + ) for hour in range(2, max_hour, step)] + intervals.append(containing_interval) + random.shuffle(intervals) + + assert maya.MayaInterval.flatten(intervals) == [containing_interval] + + +def test_interval_from_datetime(): + start = maya.now() + duration = timedelta(hours=1) + end = start + duration + + interval = maya.MayaInterval.from_datetime( + start_dt=start.datetime(naive=False), + end_dt=end.datetime(naive=False), + ) + assert interval.start == start + assert interval.end == end + + interval2 = maya.MayaInterval.from_datetime( + start_dt=start.datetime(naive=False), + duration=duration, + ) + assert interval2.start == start + assert interval2.end == end + + interval3 = maya.MayaInterval.from_datetime( + end_dt=end.datetime(naive=False), + duration=duration, + ) + assert interval3.start == start + assert interval3.end == end