from common.models import * # from decimal import Decimal from utils.Logger import Logger from utils.JDManipulator import * from utils.highPrecision import * """ Simple class that represents an interval of time Julian days should be used """ class Interval: def __init__(self, start, end): self._start = Decimal(start) self._end = Decimal(end) self.duration = self._end - self._start def __str__(self): print("[" + str(self.start) + " - " + str(self.end) + "]") def _get_start(self): return self._start def _set_start(self, start): if start > self._end: raise ValueError( "Cannot set start (%d): must be lower than end (%d)" % (start, self._end)) self._start = start self.duration = self._end - self._start def _get_end(self): return self._end def _set_end(self, end): if end < self._start: raise ValueError( "Cannot set end (%d): must be bigger than start (%d)" % (end, self._start)) self._end = end self.duration = self._end - self._start start = property(fget=_get_start, fset=_set_start) end = property(fget=_get_end, fset=_set_end) ''' Class that allows to manage intervals Only the Scheduler inherit from IntervalManagement ''' class IntervalManagement(Logger): def __init__(self, name: str = "IntervalManagement", file: str = "IntervalManagement"): super().__init__(name, file) self.max_overhead_seconds = 10 self.max_overhead = Decimal(self.max_overhead_seconds / JD_VALUE) self.intervals = [] def getMatchingIntervals(self, sequence: Sequence): matching_intervals = [] for interval in self.intervals: overlap = min(sequence.jd2, interval.end) - max(sequence.jd1, interval.start) - self.max_overhead if overlap > sequence.duration or is_nearby_equal(overlap, sequence.duration): matching_intervals.append(interval) return matching_intervals def getPreferredInterval(self, sequence: Sequence, matching_intervals) -> Interval: if sequence.t_prefered == 0 or len(matching_intervals) == 1: return matching_intervals[0] else: for index, interval in enumerate(matching_intervals): if is_between(interval.start, sequence.t_prefered, interval.end): return interval elif sequence.t_prefered < interval.start: if index == 0: pref_int = interval elif interval.start + self.max_overhead - sequence.t_prefered < sequence.t_prefered - matching_intervals[index - 1].end: pref_int = interval else: pref_int = matching_intervals[index - 1] return pref_int return matching_intervals[-1] def getSequencePositionInInterval(self, sequence: Sequence, interval: Interval) -> str: if is_between(interval.start, sequence.t_prefered, interval.end): if is_nearby_less_or_equal(sequence.t_prefered - Decimal(0.5) * sequence.duration, interval.start): position_in_interval = "START" elif is_nearby_sup_or_equal(sequence.t_prefered + Decimal(0.5) * sequence.duration, interval.end): position_in_interval = "END" else: position_in_interval = "PREFERRED" else: if sequence.t_prefered < interval.start: position_in_interval = "START" else: position_in_interval = "END" return position_in_interval def insertSequenceInInterval(self, sequence: Sequence, shs: ScheduleHasSequences, interval: Interval, position: str) -> int: if position not in ["START", "END", "PREFERRED"]: return 1 if position == "END": shs.tep = min(interval.end, sequence.jd2) shs.tsp = shs.tep - sequence.duration shs.deltaTL = shs.tsp - max(interval.start + self.max_overhead, sequence.jd1) shs.deltaTR = Decimal(0) elif position == "PREFERRED": shs.tsp = max(sequence.jd1, sequence.t_prefered - Decimal(0.5) * sequence.duration) if shs.tsp - interval.start < self.max_overhead: shs.tsp = interval.start + self.max_overhead shs.tep = shs.tsp + sequence.duration shs.deltaTL = shs.tsp - max(interval.start + self.max_overhead, sequence.jd1) shs.deltaTR = min(interval.end, sequence.jd2) - shs.tep else: shs.tsp = max(interval.start + self.max_overhead, sequence.jd1) shs.tep = shs.tsp + sequence.duration shs.deltaTL = 0 shs.deltaTR = min(interval.end, sequence.jd2) - shs.tep return 0 def placeSequence(self, sequence: Sequence, shs: ScheduleHasSequences, matching_invervals): if len(matching_invervals) == 0: return None interval = self.getPreferredInterval(sequence, matching_invervals) if not interval: return None sequence_pos = self.getSequencePositionInInterval(sequence, interval) self.insertSequenceInInterval(sequence, shs, interval, sequence_pos) self.cutInterval(sequence, shs, interval) return interval def cutInterval(self, sequence: Sequence, shs: ScheduleHasSequences, interval: Interval) -> int: inter_before = Interval(interval.start, shs.tsp - self.max_overhead) inter_after = Interval(shs.tep, interval.end) self.intervals.remove(interval) if inter_before.duration > 0: self.intervals.append(inter_before) if inter_after.duration > 0: self.intervals.append(inter_after) self.intervals.sort(key=lambda interval: interval.start, reverse=False) return 0 def getPotentialIntervals(self, sequence: Sequence): potential_intervals = [] for interval in self.intervals: overlap = min(sequence.jd2, interval.end) - max(sequence.jd1, interval.start) - self.max_overhead if overlap > 0: potential_intervals.append(interval) return potential_intervals