userSimulator.py 3.28 KB
import sys
import os
import datetime
import time
import json
import requests
from requests.auth import HTTPBasicAuth

DEBUG_FILE = True

class UserSimulator():
    conf_file = "../config/conf.json"
    conf_path = "../config/"
    request_path = "../resources/"
    login = "http://localhost:8000/user_manager/login"
    # This will call src/routine_manager/views.py:import_request()
    url = "http://localhost:8000/routine_manager/import_request"
    get_url = "http://localhost:8000/routine_manager/"
    sims = []
    ended = 0

    def userPrint(self, string: str):
        if DEBUG_FILE:
            print("UserSimulator : " + string)

    def __init__(self, argv):
        if (len(argv) > 1):
            self.conf_file = self.conf_path + argv[1]
            if DEBUG_FILE:
                print("conf_file = " + self.conf_file)
        else:
            self.userPrint("Using the default configuration file : conf.json")

    def parse(self):
        try:
            json_data = open(self.conf_file, 'r')
        except IOError:
            self.userPrint("Configuration file not found")
            sys.exit(0)
        json_content = json.loads(json_data.read())
        for dic in json_content[1:]:
            for key, value in dic.items():
                if (key == "userSimulator"):
                    self.sims.append(dic)
                    self.ended = max(dic["time"], self.ended)
        return (0)

    def sendRequest(self, file_name: str):
        files = {'request_file' : open(self.request_path + file_name, 'rb')}
        #print("Post request", self.url, "with args", files)
        # ex: Post request http://localhost:8000/routine_manager/import_request with args {'request_file': <_io.BufferedReader name='../resources/routine_request_01.xml'>}
        # This calls import_request() action from src/routine_manager/views.py
        r = self.client.post(self.url, files=files)
        if (r.status_code == 200):
            self.userPrint("The request has been successfully sent")
        else:
            self.userPrint("Sending request failed %d"%(r.status_code))
        return (0)

    def authenticate(self):
        self.client = requests.session()
        try:
            self.client.get("http://localhost:8000/user_manager/login")
            csrftoken = self.client.cookies['csrftoken']
            self.logindata = dict(email="pyros", password="DjangoPyros", csrfmiddlewaretoken=csrftoken)
            resp = self.client.post(self.login, data=self.logindata, headers=dict(Referer=self.login))
            self.userPrint("Connection status : %d"%(resp.status_code))
        except Exception as e:
            print(e, file=sys.stderr)
            return 1
        return 0

    def run(self):
        i = 0
        self.parse()
        if self.authenticate():
            return 1
        self.userPrint("The simulator will end in %d seconds"%(int(self.ended)))
        while (i <= self.ended):
            for dic in self.sims:
                if (int(dic["time"]) == i):
                    # call src/routine_manager/views.py:import_request()
                    self.sendRequest(dic["userSimulator"])
                    ##break
            i += 1
            time.sleep(1)
            ##if i==3: break
        return (0)

if (__name__ == "__main__"):
    sim = UserSimulator(sys.argv)
    sys.exit(sim.run())