pyros_api.py
4.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
#!/usr/bin/env python3
from email import header
import os
import requests
import sys
import yaml
import click
import getpass
class PyrosAPI:
"""
Request Pyros API with an PyrosUser account linked to the username and password
"""
BASE_PYROS_URL = "http://localhost:8000/api/"
def __init__(self) -> None:
"""
Initialize PyrosAPI class by getting the authentification token required to use the API
Args:
username : Username of the user (mail adress)
password : Password of the user
"""
self.get_token()
def get_token(self):
"""
get the authentification token linked to the user account
Args:
username : Username of the user (mail adress)
password : Password of the user
"""
if os.path.exists("TOKEN"):
with open("TOKEN", "r") as token_file:
self.token = token_file.read()
return
else:
url = f"{self.BASE_PYROS_URL}api-token-auth/"
print("Token needs to be generated, you need to authentificate to PyROS :")
username = input("Enter your username: ")
try:
password = getpass.getpass()
except Exception as error:
print('ERROR', error)
request = requests.post(
url, data={"username": username, "password": password})
if request.status_code != 200:
print("Authentification failed, please retry")
exit(1)
json_response = request.json()
self.token = json_response["token"]
with open("TOKEN", "w+") as token_file:
token_file.write(self.token)
def get_url(self, url: str) -> dict:
"""
Query the url and return the response as json
Args:
url : Url to be requested without the base url of the website
Returns:
dict : Json object that represents the response of the API
"""
headers = {"Authorization": f"Token {self.token}"}
request = requests.get(self.BASE_PYROS_URL+url, headers=headers)
return request.json()
def submit_sequence_file(self, file: str) -> dict:
"""
Submit sequence file by querying the API via PUT method
Args:
file : File path to the sequence file to be uploaded
Returns:
dict : Json object that represents the response of the API
"""
headers = {
"Authorization": f"Token {self.token}",
'Content-type': 'application/json',
}
url = f"{self.BASE_PYROS_URL}submit_sequence"
yaml_file = open(file, "r")
sequence_file_as_dict = yaml.safe_load(yaml_file)
print(f"File content : {sequence_file_as_dict}")
request = requests.put(url, headers=headers,
json=sequence_file_as_dict)
print(f"Request status code {request.status_code}")
return request.json()
def get_sequences_for_period(self, start_date: str, end_date: str) -> dict:
"""
Return all the sequence between two dates
Args:
start_date : start date of the period we want to retrieve (format : day/month/year, example : 13/02/2022)
end_date : end date of the period we want to retrieve (same format as start_date)
Returns:
dict : Json object that represents the response of the API
"""
url = f"{self.BASE_PYROS_URL}full_sequences/get_sequences_for_period/"
headers = {"Authorization": f"Token {self.token}"}
response = requests.get(
url, params={"start_date": start_date, "end_date": end_date}, headers=headers)
return response.json()
def logout(self):
url = f"{self.BASE_PYROS_URL}logout/"
header = {"Authorization": f"Token {self.token}"}
response = requests.get(url, headers=header)
return response.content.decode("utf-8")
@click.group()
@click.pass_context
def cli(ctx):
ctx.obj = PyrosAPI()
@cli.command("get_sequences_for_period", help="Get sequences for a period of date (start/end)")
@click.argument("start_date")
@click.argument("end_date")
@click.pass_obj
def get_sequences_for_period(api, start_date: str, end_date: str):
response = api.get_sequences_for_period(start_date, end_date)
print(response)
@cli.command("logout", help="Logout")
@click.pass_obj
def logout(api):
response = api.logout()
if os.path.exists("TOKEN"):
os.remove("TOKEN")
print("Token deleted")
print(response)
@cli.command("submit_sequence", help="Submit a sequence file")
@click.argument("file_path")
@click.pass_obj
def submit_sequence(api, file_path):
response = api.submit_sequence_file(file_path)
print(response)
@cli.command("query_url", help="Query an url to retrieve data")
@click.argument("url")
@click.pass_obj
def query_url(api, url):
response = api.get_url(url)
print(response)
if __name__ == '__main__':
cli(obj={})