Blame view

pyros_django/scheduler/Interval.py 6.2 KB
cc7b66b3   Etienne Pallier   renamed src/ to p...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
from common.models import *
# from decimal import Decimal
from utils.Logger import Logger
from utils.JDManipulator import *
from utils.highPrecision import *

"""
    Simple class that represents an interval of time
    Julian days should be used
"""


class Interval:
    def __init__(self, start, end):
        self._start = Decimal(start)
        self._end = Decimal(end)
        self.duration = self._end - self._start

    def __str__(self):
        print("[" + str(self.start) + " - " + str(self.end) + "]")

    def _get_start(self):
        return self._start

    def _set_start(self, start):
        if start > self._end:
            raise ValueError(
                "Cannot set start (%d): must be lower than end (%d)" % (start, self._end))
        self._start = start
        self.duration = self._end - self._start

    def _get_end(self):
        return self._end

    def _set_end(self, end):
        if end < self._start:
            raise ValueError(
                "Cannot set end (%d): must be bigger than start (%d)" % (end, self._start))
        self._end = end
        self.duration = self._end - self._start

    start = property(fget=_get_start, fset=_set_start)
    end = property(fget=_get_end, fset=_set_end)

'''
    Class that allows to manage intervals
    Only the Scheduler inherit from IntervalManagement
'''


class IntervalManagement(Logger):
    def __init__(self, name: str = "IntervalManagement", file: str = "IntervalManagement"):
        super().__init__(name, file)
        self.max_overhead_seconds = 10
        self.max_overhead = Decimal(self.max_overhead_seconds / JD_VALUE)
        self.intervals = []

    def getMatchingIntervals(self, sequence: Sequence):
        matching_intervals = []

        for interval in self.intervals:
            overlap = min(sequence.jd2, interval.end) - max(sequence.jd1, interval.start) - self.max_overhead
            if overlap > sequence.duration or is_nearby_equal(overlap, sequence.duration):
                matching_intervals.append(interval)
        return matching_intervals

    def getPreferredInterval(self, sequence: Sequence, matching_intervals) -> Interval:
        if sequence.t_prefered == 0 or len(matching_intervals) == 1:
            return matching_intervals[0]
        else:
            for index, interval in enumerate(matching_intervals):
                if is_between(interval.start, sequence.t_prefered, interval.end):
                    return interval
                elif sequence.t_prefered < interval.start:
                    if index == 0:
                        pref_int = interval
                    elif interval.start + self.max_overhead - sequence.t_prefered < sequence.t_prefered - matching_intervals[index - 1].end:
                        pref_int = interval
                    else:
                        pref_int = matching_intervals[index - 1]
                    return pref_int
        return matching_intervals[-1]

    def getSequencePositionInInterval(self, sequence: Sequence, interval: Interval) -> str:
        if is_between(interval.start, sequence.t_prefered, interval.end):
            if is_nearby_less_or_equal(sequence.t_prefered - Decimal(0.5) * sequence.duration, interval.start):
                position_in_interval = "START"
            elif is_nearby_sup_or_equal(sequence.t_prefered + Decimal(0.5) * sequence.duration, interval.end):
                position_in_interval = "END"
            else:
                position_in_interval = "PREFERRED"
        else:
            if sequence.t_prefered < interval.start:
                position_in_interval = "START"
            else:
                position_in_interval = "END"
        return position_in_interval

    def insertSequenceInInterval(self, sequence: Sequence, shs: ScheduleHasSequences,
                                 interval: Interval, position: str) -> int:
        if position not in ["START", "END", "PREFERRED"]:
            return 1
        if position == "END":
            shs.tep = min(interval.end, sequence.jd2)
            shs.tsp = shs.tep - sequence.duration
            shs.deltaTL = shs.tsp - max(interval.start + self.max_overhead, sequence.jd1)
            shs.deltaTR = Decimal(0)
        elif position == "PREFERRED":
            shs.tsp = max(sequence.jd1, sequence.t_prefered - Decimal(0.5) * sequence.duration)
            if shs.tsp - interval.start < self.max_overhead:
                shs.tsp = interval.start + self.max_overhead
            shs.tep = shs.tsp + sequence.duration
            shs.deltaTL = shs.tsp - max(interval.start + self.max_overhead, sequence.jd1)
            shs.deltaTR = min(interval.end, sequence.jd2) - shs.tep
        else:
            shs.tsp = max(interval.start + self.max_overhead, sequence.jd1)
            shs.tep = shs.tsp + sequence.duration
            shs.deltaTL = 0
            shs.deltaTR = min(interval.end, sequence.jd2) - shs.tep
        return 0

    def placeSequence(self, sequence: Sequence, shs: ScheduleHasSequences, matching_invervals):
        if len(matching_invervals) == 0:
            return None
        interval = self.getPreferredInterval(sequence, matching_invervals)
        if not interval:
            return None
        sequence_pos = self.getSequencePositionInInterval(sequence, interval)
        self.insertSequenceInInterval(sequence, shs, interval, sequence_pos)
        self.cutInterval(sequence, shs, interval)
        return interval

    def cutInterval(self, sequence: Sequence, shs: ScheduleHasSequences, interval: Interval) -> int:
        inter_before = Interval(interval.start, shs.tsp - self.max_overhead)
        inter_after = Interval(shs.tep, interval.end)
        self.intervals.remove(interval)
        if inter_before.duration > 0:
            self.intervals.append(inter_before)
        if inter_after.duration > 0:
            self.intervals.append(inter_after)
        self.intervals.sort(key=lambda interval: interval.start, reverse=False)
        return 0

    def getPotentialIntervals(self, sequence: Sequence):
        potential_intervals = []

        for interval in self.intervals:
            overlap = min(sequence.jd2, interval.end) - max(sequence.jd1, interval.start) - self.max_overhead
            if overlap > 0:
                potential_intervals.append(interval)

        return potential_intervals