Commit 0716eccb5b065ccc7d465a35e93e201e7113ebb0

Authored by Etienne Pallier
1 parent ddc7106a
Exists in dev

Bye Bye Celery (1 - from pyros.py)

Showing 1 changed file with 32 additions and 150 deletions   Show diff stats
pyros.py
... ... @@ -9,10 +9,6 @@ import argparse
9 9 import time
10 10 import signal
11 11  
12   -# For USE_CELERY global variable
13   -#from src.pyros import settings
14   -USE_CELERY = False
15   -#USE_CELERY = True
16 12  
17 13 DEBUG = True
18 14 SIMULATOR_CONFIG_FILE = "conf.json" #default config file
... ... @@ -54,28 +50,27 @@ class Utils:
54 50 print("Current directory : " + str(os.getcwd()))
55 51 return 0
56 52  
57   - def replacePatternInFile(self, pattern, replace, file_path):
  53 + def replacePatternInFile(self, pattern, replacement, file_path):
58 54 #try:
59 55 with fileinput.FileInput(file_path, inplace=True, backup='.bak') as file:
60 56 for line in file:
61   - print(line.replace(pattern, replace), end='')
  57 + print(line.replace(pattern, replacement), end='')
62 58 '''
63 59 except:
64 60 return 1
65 61 return 0
66 62 '''
67 63 # Now, check that replacement has been done or else ERROR !!!
68   - #pattern = "CELERY_TEST = "+str(set_it)
69 64 FOUND=False
70 65 #with fileinput.FileInput(file_path) as file:
71 66 with open(file_path) as file:
72 67 for line in file:
73   - #if replace in line:
74   - if line.startswith(replace):
  68 + #if replacement in line:
  69 + if line.startswith(replacement):
75 70 FOUND = True
76 71 break
77 72 #if FOUND: print("pattern "+pattern+" found in file "+file_path)
78   - if not FOUND: raise(Exception("pattern "+replace+" not found in file "+file_path))
  73 + if not FOUND: raise(Exception("pattern "+replacement+" not found in file "+file_path))
79 74  
80 75  
81 76 def printColor(self, color, message, file=sys.stdout, eol=os.linesep, forced=False):
... ... @@ -113,7 +108,6 @@ class AManager(Utils):
113 108 python_version = sys.version_info
114 109  
115 110 bin_dir = ""
116   - celery = "celery"
117 111 venv_pip = "pip"
118 112 venv_bin = "python"
119 113 wait = True
... ... @@ -141,34 +135,29 @@ class AManager(Utils):
141 135 self.bin_dir = "Scripts"
142 136 self.bin_name = "python.exe"
143 137 self.pip_name = "pip.exe"
144   - self.celery = "celery.exe"
145 138 else:
146 139 self.bin_dir = "bin"
147 140 self.bin_name = "python"
148 141 self.pip_name = "pip"
149   - self.celery = "celery"
150 142 self.venv_pip = self.path_dir_file + os.sep + config["path"] + os.sep + config["env"] + os.sep + self.bin_dir + os.sep + self.pip_name
151 143 self.venv_bin = self.path_dir_file + os.sep + config["path"] + os.sep + config["env"] + os.sep + self.bin_dir + os.sep + self.bin_name
152   - self.venv_cel = self.path_dir_file + os.sep + config["path"] + os.sep + config["env"] + os.sep + self.bin_dir + os.sep + self.celery
153 144  
154 145 def help(self):
155 146 print("This function must be implemented")
156 147 raise(NotImplementedError("Function not implemented"))
157 148  
158 149  
159   - def set_celery_test_and_simulator_modes_to(self, set_it:bool, simulator:bool=True):
  150 + def set_simulator_mode_to(self, set_it:bool):
160 151 file_path = "pyros/settings.py"
161 152 '''
162   - self.replacePatternInFile("CELERY_TEST = False", "CELERY_TEST = True", file_path)
163 153 if simulator: self.replacePatternInFile("SIMULATOR = False", "SIMULATOR = True", file_path)
164 154 '''
165   - self.replacePatternInFile("CELERY_TEST = "+str(not set_it), "CELERY_TEST = "+str(set_it), file_path)
166   - if simulator: self.replacePatternInFile("SIMULATOR = "+str(not set_it), "SIMULATOR = "+str(set_it), file_path)
  155 + self.replacePatternInFile("SIMULATOR = "+str(not set_it), "SIMULATOR = "+str(set_it), file_path)
167 156  
168 157  
169 158 def signal_handler(self, signal, frame):
170 159 self.printFullTerm(Colors.WARNING, "Ctrl-c catched")
171   - self.set_celery_test_and_simulator_modes_to(False)
  160 + self.set_simulator_mode_to(False)
172 161 for p in self.subproc:
173 162 proc, name = p
174 163 self.printColor(Colors.BLUE, "Killing process " + str(name))
... ... @@ -448,12 +437,10 @@ class Pyros(AManager):
448 437 return self.execProcess("rm logs/*.log")
449 438  
450 439 def test(self):
451   - ##self.singleWorker("scheduling")
452 440 self.changeDirectory("src")
453 441 ##self.execProcessFromVenv(self.venv_bin + " manage.py test")
454 442 self.execProcessFromVenvAsync(self.venv_bin + " manage.py test")
455 443 self.changeDirectory("..")
456   - ##self.stop_workers()
457 444 return 0
458 445  
459 446 def migrate(self):
... ... @@ -487,7 +474,7 @@ class Pyros(AManager):
487 474  
488 475 def reset_config(self):
489 476 self.changeDirectory("src")
490   - self.set_celery_test_and_simulator_modes_to(False)
  477 + self.set_simulator_mode_to(False)
491 478 self.addExecuted(self.current_command, "reset configuration")
492 479 self.changeDirectory("..")
493 480 return 0
... ... @@ -514,50 +501,6 @@ class Pyros(AManager):
514 501 return 0
515 502  
516 503  
517   - def celery_on(self, TOTAL=True):
518   - self.changeDirectory("src")
519   -
520   - if TOTAL: self.execProcessFromVenvAsync(self.venv_cel + " worker -A pyros -Q alert_listener_q -n pyros@alert_listener -c 1")
521   - if TOTAL: self.execProcessFromVenvAsync(self.venv_cel + " worker -A pyros -Q monitoring_q -n pyros@monitoring -c 1")
522   -
523   - #self.singleWorker("majordome")
524   - self.execProcessFromVenvAsync(self.venv_cel + " worker -A pyros -Q majordome_q -n pyros@majordome -c 1")
525   -
526   - #self.singleWorker("scheduling")
527   - self.execProcessFromVenvAsync(self.venv_cel + " worker -A pyros -Q scheduling_q --purge -n pyros@scheduling -c 1")
528   -
529   - #self.singleWorker("execute_plan_vis")
530   - self.execProcessFromVenvAsync(self.venv_cel + " worker -A pyros -Q execute_plan_vis_q --purge -n pyros@execute_plan_vis -c 1")
531   -
532   - if TOTAL: self.execProcessFromVenvAsync(self.venv_cel + " worker -A pyros -Q night_calibrations_q --purge -n pyros@night_calibrations -c 1")
533   -
534   - #self.singleWorker("execute_plan_nir")
535   - self.execProcessFromVenvAsync(self.venv_cel + " worker -A pyros -Q execute_plan_nir_q --purge -n pyros@execute_plan_nir -c 1")
536   -
537   - #self.singleWorker("create_calibrations")
538   - self.execProcessFromVenvAsync(self.venv_cel + " worker -A pyros -Q create_calibrations_q --purge -n pyros@create_calibrations -c 1")
539   -
540   - #self.singleWorker("analysis")
541   - self.execProcessFromVenvAsync(self.venv_cel + " worker -A pyros -Q analysis_q --purge -n pyros@analysis -c 1")
542   -
543   - self.changeDirectory("..")
544   - return 0
545   -
546   - # ex start()
547   - def start_workers(self):
548   - self.stop_workers()
549   - self.celery_on()
550   - return 0
551   -
552   - # ex stop()
553   - def stop_workers(self):
554   - if (self.system == "Windows"):
555   - self.execProcessAsync("taskkill /f /im celery.exe")
556   - self.execProcessAsync("taskkill /f /im python.exe")
557   - else:
558   - self.execProcessAsync("ps aux | grep \"celery worker\" | awk '{print $2}' | xargs kill -9")
559   - return 0
560   -
561 504 def init_database(self):
562 505 self.makemigrations()
563 506 self.migrate()
... ... @@ -596,15 +539,6 @@ class Pyros(AManager):
596 539 what="all"
597 540 '''
598 541  
599   - ''' OLD CODE
600   - self.stop_workers()
601   - #self.celery_on()
602   - # Start the 3 main agents (via their celery worker) :
603   - self.startAgent("monitoring")
604   - self.startAgent("majordome")
605   - self.startAgent("alert_listener")
606   - '''
607   -
608 542 # Go into src/
609 543 self.changeDirectory("src")
610 544 #print("Current directory : " + str(os.getcwd()))
... ... @@ -655,17 +589,10 @@ class Pyros(AManager):
655 589  
656 590  
657 591  
658   - def startAgent(self, agent_name:str):
659   - # Start Celery Worker
660   - self.singleWorker(agent_name)
661   - time.sleep(3)
662   - # Put the agent task run() into the Worker queue
663   -
664   -
665 592 # Reset the database content
666 593 def reset_database_sim(self):
667 594 self.changeDirectory("src")
668   - self.set_celery_test_and_simulator_modes_to(True, False)
  595 + self.set_simulator_mode_to(True, False)
669 596 '''
670 597 Supprime toutes les donnรฉes de la base de donnรฉes
671 598 et rรฉexรฉcute tout gestionnaire de post-synchronisation.
... ... @@ -673,11 +600,11 @@ class Pyros(AManager):
673 600 '''
674 601 #TODO: remplacer par manage.py --noinput flush pour eviter le "echo yes"...
675 602 self.execProcess( self.venv_bin + " manage.py flush --noinput")
676   - self.set_celery_test_and_simulator_modes_to(True, False)
  603 + self.set_simulator_mode_to(True, False)
677 604 self.changeDirectory("..")
678 605  
679 606  
680   - # Simulation for the scheduler ONLY (and only some celery workers, not all of them)
  607 + # Simulation for the scheduler ONLY
681 608 def simulator_development(self):
682 609 self.simulator(False)
683 610  
... ... @@ -691,12 +618,11 @@ class Pyros(AManager):
691 618 delete from request;
692 619 '''
693 620  
694   - # Simulation (by default, with ALL simulators and ALL celery workers)
  621 + # Simulation (by default, with ALL simulators)
695 622 def simulator(self, TOTAL=True):
696 623 self.changeDirectory("src")
697 624  
698   - # Set CELERY_TEST mode ON
699   - self.replacePatternInFile("CELERY_TEST = False", "CELERY_TEST = True", "pyros/settings.py")
  625 + # Set SIMULATOR mode ON
700 626 self.replacePatternInFile("SIMULATOR = False", "SIMULATOR = True", "pyros/settings.py")
701 627  
702 628 # 0) Reset the database (Empty the database from any data)
... ... @@ -714,7 +640,7 @@ class Pyros(AManager):
714 640 self.loaddata()
715 641  
716 642 #
717   - # 1) Launch Django web server
  643 + # 1) Launch web server
718 644 #
719 645 self.server()
720 646 self.sleep(2)
... ... @@ -727,18 +653,11 @@ class Pyros(AManager):
727 653 self.printColor(Colors.GREEN, "If you want to shutdown the simulation, please run :")
728 654 self.printColor(Colors.GREEN, "CTRL-C or pyros.py kill_simulation")
729 655 self.printColor(Colors.GREEN, "If the simulation isn't correctly killed, please switch the variable")
730   - self.printColor(Colors.GREEN, "CELERY_TEST in src/pyros/settings.py to False")
  656 + self.printColor(Colors.GREEN, "SIMULATOR in src/pyros/settings.py to False")
731 657 self.printFullTerm(Colors.WARNING, "SUMMARY")
732 658  
733 659 #
734   - # 2) (if USE_CELERY) Start Celery workers
735   - #
736   - if USE_CELERY:
737   - self.celery_on(TOTAL)
738   - self.sleep(3)
739   -
740   - #
741   - # 3) Start simulator(s) :
  660 + # 2) Start simulator(s) :
742 661 #
743 662 self.sims_launch(TOTAL)
744 663  
... ... @@ -774,20 +693,6 @@ class Pyros(AManager):
774 693 self.changeDirectory("..")
775 694 '''
776 695  
777   - '''
778   - #
779   - # 2) (if not USE_CELERY) Start Agents
780   - #
781   - if TOTAL and not USE_CELERY :
782   - #pass
783   - # Start Environment Monitoring Agent
784   - # Start Majordome Agent
785   - # Start Alert Manager Agent
786   - self.start_agents()
787   - # Needed ?
788   - self.sleep(3)
789   - '''
790   -
791 696 # When simulators are finished:
792 697 #self.kill_simulation()
793 698 return 0
... ... @@ -799,8 +704,8 @@ class Pyros(AManager):
799 704 def test_majordome(self):
800 705 self.changeDirectory("src")
801 706  
802   - # Set CELERY_TEST mode ON
803   - self.set_celery_test_and_simulator_modes_to(True)
  707 + # Set SIMULATOR mode ON
  708 + self.set_simulator_mode_to(True)
804 709  
805 710 # 0) Reset the database (Empty the database from any data)
806 711 self.changeDirectory("..")
... ... @@ -824,19 +729,11 @@ class Pyros(AManager):
824 729 self.printColor(Colors.GREEN, "If you want to shutdown the simulation, please run :")
825 730 self.printColor(Colors.GREEN, "CTRL-C or pyros.py kill_simulation")
826 731 self.printColor(Colors.GREEN, "If the simulation isn't correctly killed, please switch the variable")
827   - self.printColor(Colors.GREEN, "CELERY_TEST in src/pyros/settings.py to False")
  732 + self.printColor(Colors.GREEN, "SIMULATOR in src/pyros/settings.py to False")
828 733 self.printFullTerm(Colors.WARNING, "SUMMARY")
829 734  
830   - # 2) (if USE_CELERY) Start Celery workers
831 735 #
832   - '''
833   - if USE_CELERY:
834   - self.celery_on(TOTAL)
835   - self.sleep(3)
836   - '''
837   -
838   - #
839   - # 3) Start simulator(s) :
  736 + # 2) Start simulator(s) :
840 737 #
841 738 #self.sims_launch(True)
842 739 agent='majordome'
... ... @@ -852,7 +749,7 @@ class Pyros(AManager):
852 749 # When simulators are finished:
853 750 #self.kill_simulation()
854 751 #self.changeDirectory("src")
855   - self.set_celery_test_and_simulator_modes_to(False)
  752 + self.set_simulator_mode_to(False)
856 753 self.changeDirectory("..")
857 754 self.execProcessAsync("ps aux | grep \"start_agent_majordome.py\" | awk '{ print $2 }' | xargs kill")
858 755  
... ... @@ -870,7 +767,7 @@ class Pyros(AManager):
870 767  
871 768 def kill_simulation(self):
872 769 self.changeDirectory("src")
873   - self.set_celery_test_and_simulator_modes_to(False)
  770 + self.set_simulator_mode_to(False)
874 771 self.changeDirectory("..")
875 772 if (self.system == "Windows"):
876 773 self.execProcessAsync("taskkill /f /im python.exe")
... ... @@ -884,12 +781,12 @@ class Pyros(AManager):
884 781  
885 782 self.kill_server()
886 783  
887   - # (if not using celery) Kill all agents :
888   - if not USE_CELERY:
889   - self.execProcessAsync("ps aux | grep \"start_agent_alert_manager.py\" | awk '{ print $2 }' | xargs kill")
890   - self.execProcessAsync("ps aux | grep \"start_agent_monitoring.py\" | awk '{ print $2 }' | xargs kill")
891   - self.execProcessAsync("ps aux | grep \"start_agent_majordome.py\" | awk '{ print $2 }' | xargs kill")
  784 + # Kill all agents :
  785 + self.execProcessAsync("ps aux | grep \"start_agent_alert_manager.py\" | awk '{ print $2 }' | xargs kill")
  786 + self.execProcessAsync("ps aux | grep \"start_agent_monitoring.py\" | awk '{ print $2 }' | xargs kill")
  787 + self.execProcessAsync("ps aux | grep \"start_agent_majordome.py\" | awk '{ print $2 }' | xargs kill")
892 788 #self.execProcessAsync("ps aux | grep \"start_agent_majordome.py\" | awk '{ print $2 }' | xargs kill")
  789 +
893 790 # Kill all simulators :
894 791 self.execProcessAsync("ps aux | grep \" domeSimulator.py\" | awk '{ print $2 }' | xargs kill")
895 792 self.execProcessAsync("ps aux | grep \" userSimulator.py\" | awk '{ print $2 }' | xargs kill")
... ... @@ -899,7 +796,6 @@ class Pyros(AManager):
899 796 self.execProcessAsync("ps aux | grep \" cameraNIRSimulator.py\" | awk '{ print $2 }' | xargs kill")
900 797 self.execProcessAsync("ps aux | grep \" cameraVISSimulator.py\" | awk '{ print $2 }' | xargs kill")
901 798 self.changeDirectory("..")
902   - if USE_CELERY: self.stop_workers()
903 799  
904 800 self.printFullTerm(Colors.GREEN, "simulation ended")
905 801 return 0
... ... @@ -1001,8 +897,8 @@ class Pyros(AManager):
1001 897 # Get back to project root folder
1002 898 self.changeDirectory("..")
1003 899  
1004   - # (if not celery) Launch agents (env monitor, major, alert mgr)
1005   - if not USE_CELERY: self.start_agents()
  900 + # Launch agents (env monitor, major, alert mgr)
  901 + self.start_agents()
1006 902  
1007 903 # Wait for end of simulators :
1008 904 for p in procs: p.wait()
... ... @@ -1012,12 +908,6 @@ class Pyros(AManager):
1012 908 return 0
1013 909  
1014 910  
1015   - def singleWorker(self, worker):
1016   - self.changeDirectory("src")
1017   - self.execProcessFromVenvAsync(self.venv_cel + " worker -A pyros -Q "+ worker +"_q -n pyros@"+worker+" -c 1")
1018   - self.changeDirectory("..")
1019   - return 0
1020   -
1021 911 def mysql_on(self):
1022 912 self.changeDirectory("src")
1023 913 self.replacePatternInFile("MYSQL = False", "MYSQL = True", "pyros/settings.py")
... ... @@ -1050,7 +940,6 @@ class Pyros(AManager):
1050 940 "unittest": self.unittest,
1051 941 "reset_config": self.reset_config,
1052 942 "test_all": self.test_all,
1053   - "celery_on": self.celery_on,
1054 943 "reset_database_sim": self.reset_database_sim,
1055 944 "init_database": self.init_database,
1056 945 "kill_server": self.kill_server,
... ... @@ -1065,10 +954,6 @@ class Pyros(AManager):
1065 954 "start_agent_alertmanager": self.start_agent_alertmanager,
1066 955 "test_majordome": self.test_majordome,
1067 956  
1068   - # ex start
1069   - "start_workers": self.start_workers,
1070   - # ex stop
1071   - "stop_workers": self.stop_workers,
1072 957 "simulator": self.simulator,
1073 958 "kill_simulation": self.kill_simulation,
1074 959 "sims_launch": self.sims_launch,
... ... @@ -1092,14 +977,11 @@ class Pyros(AManager):
1092 977 "updatedb": "Update the database",
1093 978 "kill_server": "Kill the web server on port 8000",
1094 979 "init_database": "Create a standard context for pyros in db",
1095   - "unittest": "Runs the tests that don't need celery",
  980 + "unittest": "Runs the unit tests",
1096 981 "test_all": "Run all the existing tests (this command needs to be updated when tests are added in the project",
1097   - "celery_on": "Starts celery workers",
1098   - "start_workers": "Starts the celery workers (then the web server ???)",
1099   - "stop_workers": "Stops the celery workers",
1100 982 "simulator": "Launch a simulation",
1101 983 "simulator_development": "Simulation for the scheduler only",
1102   - "kill_simulation": "kill the simulators / celery workers / web server",
  984 + "kill_simulation": "kill the simulators & web server",
1103 985 "sims_launch": "Launch only the simulators",
1104 986 }
1105 987  
... ...