Commit 30b22ee6257986fbd74f0d7d342160e9a8417dfa

Authored by Etienne Pallier
1 parent 082ccda3
Exists in dev

Agent : agentX working with a command simulator

README.md
... ... @@ -67,14 +67,14 @@ This software has been tested and validated with the following configurations :
67 67 --------------------------------------------------------------------------------------------
68 68 ## LAST VERSION
69 69  
70   -Date: 04/03/2019
  70 +Date: 05/03/2019
71 71  
72 72 Author: E. Pallier
73 73  
74   -VERSION: 0.20.5
  74 +VERSION: 0.20.6
75 75  
76 76 Comment:
77   - pyros.py : enrichi commande "update" (fait aussi la mise à jour des requirements python)
  77 + Agent : agentX working with a command simulator
78 78  
79 79  
80 80 --------------------------------------------------------------------------------------------
... ...
pyros.py
... ... @@ -271,11 +271,14 @@ def install():
271 271 '''
272 272 TODO:
273 273 '''
274   -@pyros_launcher.command(help="Update the pyros software (git pull + update DB if necessary)")
  274 +@pyros_launcher.command(help="Update (only if necessary) the python packages AND the source code AND the DB structure")
275 275 def update():
276 276 print("Running update command")
277   - _gitpull() or die()
  277 + # 1) Update python packages (pip install requirements)
278 278 _update_python_packages_from_requirements() or die()
  279 + # 2) Update source code (git pull)
  280 + _gitpull() or die()
  281 + # 3) Update database structure (make migrations + migrate)
279 282 _updatedb() or die()
280 283 return True
281 284  
... ...
src/agent/Agent.py
1   -VERSION = "0.3"
  1 +VERSION = "0.4"
  2 +
  3 +
  4 +"""
  5 +=================================================================
  6 + IMPORT PYTHON PACKAGES
  7 +=================================================================
  8 +"""
2 9  
3 10 #from __future__ import absolute_import
4 11  
... ... @@ -36,7 +43,7 @@ import alert_manager.tasks
36 43 """
37 44 # from common.models import *
38 45 from common.models import Config, Log, PlcDeviceStatus
39   -from common.models import AgentSurvey, AgentCommand
  46 +from common.models import AgentSurvey, Command
40 47 from dashboard.views import get_sunelev
41 48 from devices.TelescopeRemoteControlDefault import TelescopeRemoteControlDefault
42 49  
... ... @@ -50,29 +57,60 @@ from majordome.MajordomeDecorators import *
50 57 from utils.JDManipulator import *
51 58 """
52 59  
53   -
54 60 from threading import Thread
55 61  
56   -DEBUG_FILE = False
57   -log = L.setupLogger("MajordomeTaskLogger", "Majordome")
58 62  
59   -"""
60   - Task to handle the execution of the program
61 63  
62   - check the environment status in database
63   - check the devices status (telescope / cameras)
64   - check if the last schedule made has to be planned
65   - launch schedule's sequences
  64 +
  65 +"""
  66 +=================================================================
  67 + GENERAL MODULE CONSTANT DEFINITIONS
  68 +=================================================================
66 69 """
67 70  
  71 +DEBUG_FILE = False
  72 +
  73 +log = L.setupLogger("AgentLogger", "Agent")
  74 +
68 75  
69 76  
  77 +
  78 +"""
  79 +=================================================================
  80 + class Agent
  81 +=================================================================
  82 +"""
  83 +
70 84 class Agent:
71 85 # (EP) do this so that Majordome can be run from a thread, and called with thread.start():
72 86 # class Majordome(Task, Thread):
73 87  
74   - name = "Generic Agent"
  88 + # FOR TEST ONLY
  89 + # Run this agent in simulator mode
  90 + SIMULATOR_MODE = True
  91 + SIMULATOR_COMMANDS = iter([
  92 + "go_active",
  93 + "go_idle",
  94 +
  95 + "go_active",
  96 + "go_idle",
  97 +
  98 + "go_active",
  99 +
  100 + "specific_1",
  101 + "specific_2",
  102 +
  103 + "stop"
  104 + ])
  105 + #_simulator_current_cmd_idx = 0
  106 + #_current_test_cmd = None
  107 + #_nb_test_cmds = 0
  108 +
  109 +
  110 + # Run for real, otherwise just print messages without really doing anything
75 111 FOR_REAL = True
  112 +
  113 + name = "Generic Agent"
76 114 mainloop_waittime = 3
77 115 subloop_waittime = 2
78 116 status = None
... ... @@ -94,7 +132,7 @@ class Agent:
94 132 CONFIG_DIR = "config"
95 133  
96 134 _agent_survey = None
97   - _agent_command = None
  135 +
98 136  
99 137 def __init__(self, name:str=None, config_filename:str=None):
100 138 self.set_mode(self.MODE_IDLE)
... ... @@ -115,15 +153,17 @@ class Agent:
115 153 self.config = ConfigPyros(config_filename)
116 154 if self.config.get_last_errno() != self.config.NO_ERROR:
117 155 raise Exception(f"Bad config file name '{config_filename}', error {str(self.config.get_last_errno())}: {str(self.config.get_last_errmsg())}")
  156 +
  157 + # Create 1st survey if none
118 158 #tmp = AgentSurvey.objects.filter(name=self.name)
119 159 #if len(tmp) == 0:
120 160 #nb_agents = AgentSurvey.objects.filter(name=self.name).count()
121 161 #if nb_agents == 0:
122 162 if not AgentSurvey.objects.filter(name=self.name).exists():
123 163 self._agent_survey = AgentSurvey.objects.create(name=self.name, validity_duration_sec=60, mode=self.mode, status=self.status)
  164 + print("Agent survey is", self._agent_survey)
124 165 #self._agent_survey = AgentSurvey(name=self.name, validity_duration_sec=60, mode=self.mode, status=self.status)
125 166 #self._agent_survey.save()
126   - print("agent is", self._agent_survey)
127 167  
128 168  
129 169 def __str__(self):
... ... @@ -170,6 +210,7 @@ class Agent:
170 210 """
171 211  
172 212 print()
  213 + print()
173 214 print("Starting main loop iteration...")
174 215 self.show_mode_and_status()
175 216  
... ... @@ -177,12 +218,13 @@ class Agent:
177 218  
178 219 self.update_survey()
179 220  
180   - self.test_send_command_to_myself()
  221 + if self.SIMULATOR_MODE: self.simulator_send_command_to_myself()
181 222  
182 223 # generic cmd in json format
183   - cmd = self.read_next_command()
  224 + print("---")
  225 + cmd = self.get_next_command()
184 226  
185   - self.general_process(cmd)
  227 + if cmd: cmd = self.general_process(cmd)
186 228 '''
187 229 if self.config.majordome_state == "STOP":
188 230 break
... ... @@ -196,7 +238,8 @@ class Agent:
196 238 # Sub-level loop (only if ACTIVE)
197 239 if self.is_active():
198 240 self.set_status(self.STATUS_PROCESS_LOOP)
199   - self.specific_process(cmd)
  241 + if cmd: self.specific_process(cmd)
  242 + print("---")
200 243  
201 244 self.waitfor(self.mainloop_waittime)
202 245  
... ... @@ -206,14 +249,6 @@ class Agent:
206 249  
207 250  
208 251  
209   - def general_process(self, cmd):
210   - pass
211   -
212   - # @abstract
213   - # to be implemented by subclasses
214   - def specific_process(self, cmd):
215   - raise NotImplemented()
216   -
217 252 def waitfor(self, nbsec):
218 253 print(f"Now, waiting for {nbsec} seconds...")
219 254 time.sleep(nbsec)
... ... @@ -221,6 +256,7 @@ class Agent:
221 256 def set_status(self, status:str):
222 257 print(f"Switching from status {self.status} to status {status}")
223 258 self.status = status
  259 + return False
224 260  
225 261 def set_mode(self, mode:str):
226 262 print(f"Switching from mode {self.mode} to mode {mode}")
... ... @@ -233,12 +269,11 @@ class Agent:
233 269 self.set_mode(self.MODE_ACTIVE)
234 270  
235 271 def set_idle(self):
236   - self.mode = self.MODE_IDLE
  272 + self.set_mode(self.MODE_IDLE)
237 273  
238 274 def show_mode_and_status(self):
239 275 print(f"CURRENT MODE is {self.mode} (with status {self.status})")
240 276  
241   -
242 277 def die(self):
243 278 self.set_status(self.STATUS_EXIT)
244 279  
... ... @@ -263,7 +298,7 @@ class Agent:
263 298 self.set_active()
264 299 return True
265 300  
266   - def set_mode_from_config(self,agent_alias):
  301 + def set_mode_from_config(self, agent_alias):
267 302 # --- Get the startmode of the AgentX
268 303 modestr = self.config.get_paramvalue(agent_alias,'general','startmode')
269 304 if self.config.get_last_errno() != self.config.NO_ERROR:
... ... @@ -314,7 +349,6 @@ class Agent:
314 349 namevalue = self.config.get_paramvalue(alias,unit_subtag,'name')
315 350 print("Unit {} alias is {}. Name is {}".format(unit_subtag,alias,namevalue))
316 351 print("------------------------------------------")
317   -
318 352 #params = self.config.get_params(unit_alias)
319 353 #for param in params:
320 354 # print("Unit component is {}".format(param))
... ... @@ -337,52 +371,114 @@ class Agent:
337 371 return False
338 372 """
339 373  
340   - def update_db_survey(self):
  374 + def update_survey(self):
341 375 print("Updating the survey database table...")
342 376 self._agent_survey = AgentSurvey.objects.get(name=self.name)
343 377 self._agent_survey.mode = self.mode
344 378 self._agent_survey.status = self.status
345 379 self._agent_survey.save()
346 380  
347   - def test_send_command_to_myself(self):
348   - agent_command = AgentCommand.objects.create(sender=self.name, receiver=self.name, command="toto")
349   - #(sender=self.name, receiver=self.name, receiver_error_code=1)
350 381  
351   - def read_next_command(self):
  382 + def simulator_send_command_to_myself(self):
  383 + #self._current_test_cmd = "go_idle" if self._current_test_cmd=="go_active" else "go_active"
  384 + #if self._nb_test_cmds == 4: self._current_test_cmd = "stop"
  385 + cmd = next(self.SIMULATOR_COMMANDS, None)
  386 + if not cmd: return
  387 + agent_command = Command.objects.create(sender=self.name, receiver=self.name, command=cmd)
  388 + #self._simulator_current_cmd_idx += 1
  389 + #self._nb_test_cmds += 1
  390 +
  391 +
  392 + def get_next_command(self)->Command:
  393 + """
  394 + Return next command (read from the DB command table) which is relevant to this agent
  395 + Commands are read in chronological order
  396 + """
  397 +
352 398 print("Looking for new commands from the database ...")
353   - agent_commands = AgentCommand.objects.filter(receiver=self.name, receiver_status_code=AgentCommand.RECEIVER_STATUS_CODES.RSCODE_PENDING)
354   - # Is there any command for me to execute ?
355   - if agent_commands:
356   - print(f"I have received {len(agent_commands)} new command(s) to execute")
357   - for agent_command in agent_commands:
358   - self.exec_cmd(agent_command)
359   -
360   - def exec_cmd(self, ac:AgentCommand):
361   - print(f"Executing command {ac.command} sent by agent {ac.sender} at {ac.sender_deposit_time}")
362   - print(ac)
363   - # Already read this command ?
364   - if ac.receiver_read_time:
365   - print("I have already read this command before, so remove it")
366   - #ac.delete()
367   - return
368   - # Update table to say that the command has been READ
369   - ac.receiver_read_time = datetime.datetime.now()
370   - '''
371   - if (ac.receiver_read_time - ac.sender_deposit_time) > ac.validity_duration_sec:
372   - print("This command is expired, so remove it")
373   - #ac.delete()
374   - return
375   - '''
376   - # Execute this command
377   - # TODO:
378   - print("...command being executed...")
  399 +
  400 + # 1) Is there a command currently being processed (status CMD_RUNNING) ?
  401 + # If so, return
  402 + if Command.objects.filter(
  403 + # only commands for me
  404 + receiver = self.name,
  405 + # only pending commands
  406 + receiver_status_code = Command.CMD_STATUS_CODES.CMD_RUNNING,
  407 + ).exists():
  408 + print("There is currently a running command, so I do nothing (wait for end of execution)")
  409 + return None
  410 +
  411 + # 2) Get only the oldest PENDING command which I am recipient of
  412 + cmd = Command.objects.filter(
  413 + # only commands for me
  414 + receiver = self.name,
  415 + # only pending commands
  416 + receiver_status_code = Command.CMD_STATUS_CODES.CMD_PENDING,
  417 + ).order_by('sender_deposit_time').first()
  418 + #).order_by('sender_deposit_time')
  419 + #print("all commands", next_command)
  420 + #next_command = next_command.first()
  421 + if not cmd: return None
  422 + print(f"Got command {cmd.command} sent by agent {cmd.sender} at {cmd.sender_deposit_time}")
  423 + print(cmd)
  424 +
  425 + # 3) Update read time to say that the command has been READ
  426 + assert cmd.receiver_read_time is None # f"Command {cmd} should not have been already read !!"
  427 + cmd.receiver_read_time = datetime.datetime.now()
  428 + cmd.save()
  429 + return cmd
  430 +
  431 +
  432 + def general_process(self, cmd:Command)->Command:
  433 +
  434 + # Precondition : command cmd has already been read
  435 + assert cmd.receiver_read_time is not None # f"Command {cmd} should have been already read !!"
  436 +
  437 + print(f"Starting general processing of command {cmd.command} sent by agent {cmd.sender} at {cmd.sender_deposit_time}")
  438 + #print(cmd)
  439 +
  440 + # If expired command, change its status to expired and return
  441 + elapsed_time = cmd.receiver_read_time - cmd.sender_deposit_time
  442 + max_time = datetime.timedelta(seconds = cmd.validity_duration_sec)
  443 + print(f"Elapsed time is {elapsed_time}, (max is {max_time})")
  444 + if elapsed_time > max_time:
  445 + print("This command is expired, so mark it as expired, and ignore it")
  446 + #cmd.delete()
  447 + cmd.receiver_status_code = Command.CMD_STATUS_CODES.CMD_OUTOFDATE
  448 + cmd.save()
  449 + return None
  450 +
  451 + # If cmd is generic, execute it, change its status to executed, and return
  452 + if cmd.is_generic():
  453 + print("This command is generic, execute it...")
  454 + self.exec_generic_cmd(cmd)
  455 + print("...Generic cmd has been executed")
  456 + cmd.receiver_status_code = Command.CMD_STATUS_CODES.CMD_EXECUTED
  457 + cmd.save()
  458 + if cmd.command == "stop": exit(0)
  459 + return None
  460 +
  461 + # cmd is not generic
  462 + else:
  463 + # cmd is not generic but, as I am idle, change its status to SKIPPED, ignore it, and return
  464 + if self.mode == self.MODE_IDLE:
  465 + print("This command is not generic but, as I am IDLE, I mark it SKIPPED and ignore it")
  466 + cmd.receiver_status_code = Command.CMD_STATUS_CODES.CMD_SKIPPED
  467 + cmd.save()
  468 + return None
  469 +
  470 + # Je suis pas idle et cde pas générique: je la traite pas, elle sera traitée par core_process :
  471 + # attendre que cette commande soit exécutée avant de passer à la commande suivante (situation “bloquante” normale)
  472 + print("This command is not generic and, as I am not IDLE, I pass it to the specific processing")
  473 + print("(then I will wait for this command to be EXECUTED before going to next command)")
  474 + return cmd
  475 +
  476 +
  477 + def exec_generic_cmd(self, cmd:Command):
379 478 time.sleep(1)
380   - print("Command is executed")
381   - # Now, update table to say that the command has been EXECUTED
382   - ac.receiver_error_code = 0
383   - # Update table
384   - ####ac.save()
385   - # TODO: cleanup table from OLD lines already executed
  479 + if cmd.command == "go_active": self.set_active()
  480 + elif cmd.command == "go_idle": self.set_idle()
  481 + elif cmd.command == "stop": pass
386 482  
387 483  
388 484 def do_log(self):
... ... @@ -394,19 +490,16 @@ class Agent:
394 490 print("Logging data...")
395 491  
396 492  
397   - def core_process(self):
  493 + # @abstract
  494 + # to be implemented by subclasses
  495 + def specific_process(self, cmd:Command):
  496 + #raise NotImplemented()
398 497 """
399 498 Sublevel Loop (only if ACTIVE) :
400 499 PLUS TARD, maybe :start_process_thread() dans un thread : ensuite, à chaque tour de boucle il regarde si c'est fini ou pas, et si fini recommence
401 500 """
402   - assert(self.is_active())
403   -
404   - print("Starting the core process subloop...")
405   -
406   - self.waitfor(self.subloop_waittime)
407   -
408   - print("Ending core process subloop...")
409   -
  501 + assert self.is_active()
  502 + # TODO: LOG
410 503  
411 504  
412 505  
... ...
src/agent/AgentX.py
... ... @@ -2,6 +2,8 @@
2 2 import utils.Logger as L
3 3  
4 4 from .Agent import Agent
  5 +from common.models import Command
  6 +
5 7  
6 8  
7 9 log = L.setupLogger("AgentXTaskLogger", "AgentX")
... ... @@ -27,23 +29,40 @@ class AgentX(Agent):
27 29 # --- Set the mode according the startmode value
28 30 agent_alias = self.__class__.__name__
29 31 self.set_mode_from_config(agent_alias)
  32 + # TODO: remove
  33 + self.set_idle()
30 34  
31 35 # @override
  36 + '''
32 37 def load_config(self):
33 38 super().load_config()
  39 + '''
34 40  
35 41 # @override
  42 + '''
36 43 def update_survey(self):
37   - super().update_db_survey()
  44 + super().update_survey()
  45 + '''
38 46  
39 47 # @override
40   - def read_next_command(self):
41   - return super().read_next_command()
  48 + '''
  49 + def get_next_command(self):
  50 + return super().get_next_command()
  51 + '''
42 52  
43 53 # @override
44 54 def do_log(self):
45 55 super().do_log()
46 56  
47 57 # @override
48   - def specific_process(self, cmd):
49   - pass
50 58 \ No newline at end of file
  59 + def specific_process(self, cmd:Command):
  60 + super().specific_process(cmd)
  61 +
  62 + print("Starting specific process subloop...")
  63 + cmd.receiver_status_code = Command.CMD_STATUS_CODES.CMD_RUNNING
  64 +
  65 + self.waitfor(self.subloop_waittime)
  66 +
  67 + cmd.receiver_status_code = Command.CMD_STATUS_CODES.CMD_EXECUTED
  68 + cmd.save()
  69 + print("Ending specific process subloop...")
... ...
src/common/models.py
... ... @@ -210,7 +210,7 @@ class Request(models.Model):
210 210 ------------------------
211 211 """
212 212  
213   -class AgentCommand(models.Model):
  213 +class Command(models.Model):
214 214 """
215 215 | id | sender | receiver | command | validity_duration_sec (default=60) | sender_deposit_time | receiver_read_time
216 216 (l'agent destinataire en profite pour supprimer les commandes périmées qui le concernent)
... ... @@ -223,7 +223,7 @@ class AgentCommand(models.Model):
223 223 RSCODE_PENDING = 1 # cde en attente d'exécution
224 224 RSCODE_SKIPPED = 2 # cde ignorée (je suis idle… et j’ai ignoré cette commande, et je passe à la cde suivante)
225 225 RSCODE_OUTOFDATE = 3 # cde périmée
226   - RECEIVER_STATUS_CODES = (
  226 + CMD_STATUS_CODES = (
227 227 RSCODE_RUNNING, # en cours d’exécution => une fois la cde lue
228 228 RSCODE_EXECUTED, # cde exécutée => simulé par un sleep(3) dans AgentX.core_process())
229 229 RSCODE_PENDING, # cde en attente d'exécution
... ... @@ -231,14 +231,16 @@ class AgentCommand(models.Model):
231 231 RSCODE_OUTOFDATE, # cde périmée
232 232 )
233 233 """
234   - RECEIVER_STATUS_CODES = Choices(
235   - "RSCODE_RUNNING", # en cours d’exécution => une fois la cde lue
236   - "RSCODE_EXECUTED", # cde exécutée => simulé par un sleep(3) dans AgentX.core_process())
237   - "RSCODE_PENDING", # cde en attente d'exécution
238   - "RSCODE_SKIPPED", # cde ignorée (je suis idle… et j’ai ignoré cette commande, et je passe à la cde suivante)
239   - "RSCODE_OUTOFDATE" # cde périmée
  234 + CMD_STATUS_CODES = Choices(
  235 + "CMD_RUNNING", # en cours d’exécution => une fois la cde lue
  236 + "CMD_EXECUTED", # cde exécutée => simulé par un sleep(3) dans AgentX.core_process())
  237 + "CMD_PENDING", # cde en attente d'exécution
  238 + "CMD_SKIPPED", # cde ignorée (je suis idle… et j’ai ignoré cette commande, et je passe à la cde suivante)
  239 + "CMD_OUTOFDATE" # cde périmée
240 240 )
241 241  
  242 + GENERIC_COMMANDS = ["go_idle", "go_active", "stop"]
  243 +
242 244 #sender = models.CharField(max_length=50, blank=True, null=True, unique=True)
243 245 sender = models.CharField(max_length=50, help_text='sender agent name')
244 246 receiver = models.CharField(max_length=50)
... ... @@ -253,19 +255,26 @@ class AgentCommand(models.Model):
253 255 receiver_read_time = models.DateTimeField(null=True)
254 256 # - after execution
255 257 receiver_processed_time = models.DateTimeField(null=True)
256   - receiver_status_code = models.CharField(choices = RECEIVER_STATUS_CODES, default=RECEIVER_STATUS_CODES.RSCODE_PENDING, max_length=20)
257   - #receiver_status_code = models.IntegerField(choices=RECEIVER_STATUS_CODES, default=RSCODE_PENDING)
  258 + receiver_status_code = models.CharField(choices = CMD_STATUS_CODES, default=CMD_STATUS_CODES.CMD_PENDING, max_length=20)
  259 + #receiver_status_code = models.IntegerField(choices=CMD_STATUS_CODES, default=RSCODE_PENDING)
258 260 # TODO: maybe à mettre au format json (key:value)
259 261 result = models.CharField(max_length=400, blank=True)
260 262  
261 263 class Meta:
262 264 managed = True
263   - db_table = 'agent_command'
  265 + db_table = 'command'
264 266 #verbose_name = "agent survey"
265 267 #verbose_name_plural = "agents survey"
266 268  
267 269 def __str__(self):
268   - return (f"Agent {self.sender} sent commmand {self.command} to {self.receiver} at {self.sender_deposit_time}")
  270 + return (f"Agent {self.sender} sent commmand '{self.command}' to {self.receiver} at {self.sender_deposit_time}")
  271 +
  272 + def is_generic(self):
  273 + """
  274 + Is this a generic command ?
  275 + It is the case if command is of style "go_idle" or "go_active" or "stop"...
  276 + """
  277 + return self.command in self.GENERIC_COMMANDS
269 278  
270 279  
271 280 class AgentSurvey(models.Model):
... ...