import sys import os import datetime import time import json import requests from requests.auth import HTTPBasicAuth DEBUG_FILE = True class UserSimulator(): conf_file = "../config/conf.json" conf_path = "../config/" request_path = "../resources/" login = "http://localhost:8000/user_manager/login" # This will call src/routine_manager/views.py:import_request() url = "http://localhost:8000/routine_manager/import_request" get_url = "http://localhost:8000/routine_manager/" sims = [] ended = 0 def userPrint(self, string: str): if DEBUG_FILE: print("UserSimulator : " + string) def __init__(self, argv): if (len(argv) > 1): self.conf_file = self.conf_path + argv[1] if DEBUG_FILE: print("conf_file = " + self.conf_file) else: self.userPrint("Using the default configuration file : conf.json") def parse(self): try: json_data = open(self.conf_file, 'r') except IOError: self.userPrint("Configuration file not found") sys.exit(0) json_content = json.loads(json_data.read()) for dic in json_content[1:]: for key, value in dic.items(): if (key == "userSimulator"): self.sims.append(dic) self.ended = max(dic["time"], self.ended) return (0) def sendRequest(self, file_name: str): files = {'request_file' : open(self.request_path + file_name, 'rb')} #print("Post request", self.url, "with args", files) # ex: Post request http://localhost:8000/routine_manager/import_request with args {'request_file': <_io.BufferedReader name='../resources/routine_request_01.xml'>} # This calls import_request() action from src/routine_manager/views.py r = self.client.post(self.url, files=files) if (r.status_code == 200): self.userPrint("The request has been successfully sent") else: self.userPrint("Sending request failed %d"%(r.status_code)) return (0) def authenticate(self): self.client = requests.session() try: self.client.get("http://localhost:8000/user_manager/login") csrftoken = self.client.cookies['csrftoken'] self.logindata = dict(email="pyros", password="DjangoPyros", csrfmiddlewaretoken=csrftoken) resp = self.client.post(self.login, data=self.logindata, headers=dict(Referer=self.login)) self.userPrint("Connection status : %d"%(resp.status_code)) except Exception as e: print(e, file=sys.stderr) return 1 return 0 def run(self): i = 0 self.parse() if self.authenticate(): return 1 self.userPrint("The simulator will end in %d seconds"%(int(self.ended))) while (i <= self.ended): for dic in self.sims: if (int(dic["time"]) == i): # call src/routine_manager/views.py:import_request() self.sendRequest(dic["userSimulator"]) ##break i += 1 time.sleep(1) ##if i==3: break return (0) if (__name__ == "__main__"): sim = UserSimulator(sys.argv) sys.exit(sim.run())