AgentScheduler.py
4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
#!/usr/bin/env python3
import sys
##import utils.Logger as L
#import threading #, multiprocessing, os
import time
#from django.db import transaction
#from common.models import Command
sys.path.append("..")
sys.path.append("../../../..")
from src.core.pyros_django.agent.Agent import Agent, build_agent, log
# PM 20190416 recycle code
#from common.models import *
from common.models import Sequence
##log = L.setupLogger("AgentXTaskLogger", "AgentX")
class AgentScheduler(Agent):
# FOR TEST ONLY
# Run this agent in simulator mode
TEST_MODE = False
# Run the assertion tests at the end
TEST_WITH_FINAL_TEST = True
TEST_MAX_DURATION_SEC = None
#TEST_MAX_DURATION_SEC = 120
# PM 20190416 fucking config path starting: /home/patrick/Dev/PYROS/start_agent.py agentM
##_path_data = 'config'
_path_data = 'config/old_config'
log.debug("PLC instanciated")
AGENT_SPECIFIC_COMMANDS = [
#"do_replan",
#"do_stop_replan",
]
'''
# Who should I send commands to ?
#TEST_COMMANDS_DEST = "myself"
TEST_COMMANDS_DEST = "AgentA"
# Scenario to be executed
TEST_COMMANDS_LIST = [
"go_active",
"go_idle",
"go_active",
"go_idle",
"go_active",
"go_idle",
"exit",
]
'''
"""
=================================================================
FUNCTIONS RUN INSIDE MAIN THREAD
=================================================================
"""
# old config
# @override
#def __init__(self, name:str=None, config_filename=None, RUN_IN_THREAD=True):
# def __init__(self, config_filename=None, RUN_IN_THREAD=True):
# ##if name is None: name = self.__class__.__name__
# super().__init__(config_filename, RUN_IN_THREAD)
# new config (obsconfig)
#def __init__(self, name:str=None, RUN_IN_THREAD=True):
def __init__(self, name:str=None):
if name is None:
name = self.__class__.__name__
super().__init__()
#super().__init__(RUN_IN_THREAD)
# @override
def init(self):
super().init()
log.debug("end init()")
# --- Set the mode according the startmode value
##agent_alias = self.__class__.__name__
##self.set_mode_from_config(agent_alias)
'''
# @override
def load_config(self):
super().load_config()
'''
'''
# @override
def update_survey(self):
super().update_survey()
'''
'''
# @override
def get_next_command(self):
return super().get_next_command()
'''
# @override
def do_log(self):
super().do_log()
def replan_sequences(self):
print("\n start of sequences (re-)planning...\n")
time.sleep(5)
sequences = Sequence.objects.filter(status="TBP")
print("List of sequences to be planned :")
for seq in sequences:
print('-', seq.name, '('+seq.status+') :')
print('-- albums : ', seq.albums.all())
print('-- plans : ')
for album in seq.albums.all():
print(album)
for plan in album.plans.all():
print('plan id', plan.id)
print("\n ...end of sequences (re-)planning\n")
# Note : called by _routine_process() in Agent
# @override
def routine_process_before_body(self):
print("The Observatory configuration :")
self.show_config()
log.debug("in routine_process_before_body()")
#self.replan_sequences()
# Note : called by _routine_process() in Agent
# @override
def routine_process_after_body(self):
print("The Observatory configuration :")
#self.show_config()
log.debug("in routine_process_after_body()")
self.replan_sequences()
'''
# @override
def exec_specific_cmd_end(self, cmd:Command, from_thread=True):
super().exec_specific_cmd_end(cmd, from_thread)
'''
if __name__ == "__main__":
agent = build_agent(AgentScheduler)
print(agent)
agent.run()