Interval.py 6.26 KB
from common.models import *
from seq_submit.models import Sequence, ScheduleHasSequences
# from decimal import Decimal
from utils.Logger import Logger
from utils.JDManipulator import *
from utils.highPrecision import *

"""
    Simple class that represents an interval of time
    Julian days should be used
"""


class Interval:
    def __init__(self, start, end):
        self._start = Decimal(start)
        self._end = Decimal(end)
        self.duration = self._end - self._start

    def __str__(self):
        print("[" + str(self.start) + " - " + str(self.end) + "]")

    def _get_start(self):
        return self._start

    def _set_start(self, start):
        if start > self._end:
            raise ValueError(
                "Cannot set start (%d): must be lower than end (%d)" % (start, self._end))
        self._start = start
        self.duration = self._end - self._start

    def _get_end(self):
        return self._end

    def _set_end(self, end):
        if end < self._start:
            raise ValueError(
                "Cannot set end (%d): must be bigger than start (%d)" % (end, self._start))
        self._end = end
        self.duration = self._end - self._start

    start = property(fget=_get_start, fset=_set_start)
    end = property(fget=_get_end, fset=_set_end)

'''
    Class that allows to manage intervals
    Only the Scheduler inherit from IntervalManagement
'''


class IntervalManagement(Logger):
    def __init__(self, name: str = "IntervalManagement", file: str = "IntervalManagement"):
        super().__init__(name, file)
        self.max_overhead_seconds = 10
        self.max_overhead = Decimal(self.max_overhead_seconds / JD_VALUE)
        self.intervals = []

    def getMatchingIntervals(self, sequence: Sequence):
        matching_intervals = []

        for interval in self.intervals:
            overlap = min(sequence.jd2, interval.end) - max(sequence.jd1, interval.start) - self.max_overhead
            if overlap > sequence.duration or is_nearby_equal(overlap, sequence.duration):
                matching_intervals.append(interval)
        return matching_intervals

    def getPreferredInterval(self, sequence: Sequence, matching_intervals) -> Interval:
        if sequence.t_prefered == 0 or len(matching_intervals) == 1:
            return matching_intervals[0]
        else:
            for index, interval in enumerate(matching_intervals):
                if is_between(interval.start, sequence.t_prefered, interval.end):
                    return interval
                elif sequence.t_prefered < interval.start:
                    if index == 0:
                        pref_int = interval
                    elif interval.start + self.max_overhead - sequence.t_prefered < sequence.t_prefered - matching_intervals[index - 1].end:
                        pref_int = interval
                    else:
                        pref_int = matching_intervals[index - 1]
                    return pref_int
        return matching_intervals[-1]

    def getSequencePositionInInterval(self, sequence: Sequence, interval: Interval) -> str:
        if is_between(interval.start, sequence.t_prefered, interval.end):
            if is_nearby_less_or_equal(sequence.t_prefered - Decimal(0.5) * sequence.duration, interval.start):
                position_in_interval = "START"
            elif is_nearby_sup_or_equal(sequence.t_prefered + Decimal(0.5) * sequence.duration, interval.end):
                position_in_interval = "END"
            else:
                position_in_interval = "PREFERRED"
        else:
            if sequence.t_prefered < interval.start:
                position_in_interval = "START"
            else:
                position_in_interval = "END"
        return position_in_interval

    def insertSequenceInInterval(self, sequence: Sequence, shs: ScheduleHasSequences,
                                 interval: Interval, position: str) -> int:
        if position not in ["START", "END", "PREFERRED"]:
            return 1
        if position == "END":
            shs.tep = min(interval.end, sequence.jd2)
            shs.tsp = shs.tep - sequence.duration
            shs.deltaTL = shs.tsp - max(interval.start + self.max_overhead, sequence.jd1)
            shs.deltaTR = Decimal(0)
        elif position == "PREFERRED":
            shs.tsp = max(sequence.jd1, sequence.t_prefered - Decimal(0.5) * sequence.duration)
            if shs.tsp - interval.start < self.max_overhead:
                shs.tsp = interval.start + self.max_overhead
            shs.tep = shs.tsp + sequence.duration
            shs.deltaTL = shs.tsp - max(interval.start + self.max_overhead, sequence.jd1)
            shs.deltaTR = min(interval.end, sequence.jd2) - shs.tep
        else:
            shs.tsp = max(interval.start + self.max_overhead, sequence.jd1)
            shs.tep = shs.tsp + sequence.duration
            shs.deltaTL = 0
            shs.deltaTR = min(interval.end, sequence.jd2) - shs.tep
        return 0

    def placeSequence(self, sequence: Sequence, shs: ScheduleHasSequences, matching_invervals):
        if len(matching_invervals) == 0:
            return None
        interval = self.getPreferredInterval(sequence, matching_invervals)
        if not interval:
            return None
        sequence_pos = self.getSequencePositionInInterval(sequence, interval)
        self.insertSequenceInInterval(sequence, shs, interval, sequence_pos)
        self.cutInterval(sequence, shs, interval)
        return interval

    def cutInterval(self, sequence: Sequence, shs: ScheduleHasSequences, interval: Interval) -> int:
        inter_before = Interval(interval.start, shs.tsp - self.max_overhead)
        inter_after = Interval(shs.tep, interval.end)
        self.intervals.remove(interval)
        if inter_before.duration > 0:
            self.intervals.append(inter_before)
        if inter_after.duration > 0:
            self.intervals.append(inter_after)
        self.intervals.sort(key=lambda interval: interval.start, reverse=False)
        return 0

    def getPotentialIntervals(self, sequence: Sequence):
        potential_intervals = []

        for interval in self.intervals:
            overlap = min(sequence.jd2, interval.end) - max(sequence.jd1, interval.start) - self.max_overhead
            if overlap > 0:
                potential_intervals.append(interval)

        return potential_intervals