pyros_api.py
6.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
#!/usr/bin/env python3
import os
from urllib3.exceptions import ProtocolError
from requests.exceptions import ConnectionError
import requests
import sys
import yaml
import click
import getpass
class PyrosAPI:
"""
Request Pyros API with an PyrosUser account linked to the username and password
"""
#host = "http://pyros.irap.omp.eu/api/"
def __init__(self, host) -> None:
"""
Initialize PyrosAPI class by getting the authentification token required to use the API
Args:
username : Username of the user (mail adress)
password : Password of the user
"""
if host == None:
self.host = "http://localhost:8000"
else:
self.host = host
self.host = self.host+"/api/"
self.get_token()
def get_token(self):
"""
get the authentification token linked to the user account
Args:
username : Username of the user (mail adress)
password : Password of the user
"""
if os.path.exists("TOKEN"):
with open("TOKEN", "r") as token_file:
self.token = token_file.read()
return
else:
url = f"{self.host}api-token-auth/"
print("Token needs to be generated, you need to authentificate to PyROS :")
username = input("Enter your username: ")
try:
password = getpass.getpass()
except Exception as error:
print('ERROR', error)
try:
request = requests.post(
url, data={"username": username, "password": password})
except (ConnectionError, ConnectionResetError, ProtocolError):
print(
f"Server {self.host} doesn't respond. The website might be down.")
exit(1)
if request.status_code != 200:
print("Authentification failed, please retry")
exit(1)
json_response = request.json()
self.token = json_response["token"]
with open("TOKEN", "w+") as token_file:
token_file.write(self.token)
def get_url(self, url: str) -> dict:
"""
Query the url and return the response as json
Args:
url : Url to be requested without the base url of the website
Returns:
dict : Json object that represents the response of the API
"""
headers = {"Authorization": f"Token {self.token}"}
try:
request = requests.get(self.host+url, headers=headers)
except (ConnectionError, ConnectionResetError, ProtocolError):
print(
f"Server {self.host} doesn't respond. The website might be down.")
exit(1)
return request.json()
def submit_sequence_file(self, file: str) -> dict:
"""
Submit sequence file by querying the API via PUT method
Args:
file : File path to the sequence file to be uploaded
Returns:
dict : Json object that represents the response of the API
"""
headers = {
"Authorization": f"Token {self.token}",
}
url = f"{self.host}submit_sequence"
yaml_file = open(file, "rb")
try:
request = requests.put(url, headers=headers,
files={"sequence_file":yaml_file})
except (ConnectionError, ConnectionResetError, ProtocolError):
print(
f"Server {self.host} doesn't respond. The website might be down.")
exit(1)
print(f"Request status code {request.status_code}")
if request.status_code == 401:
print("Bad token, disconnecting from webserver. Please authenticate yourself next time.")
self.delete_token()
return request.json()
def get_sequences_for_period(self, start_date: str, end_date: str) -> dict:
"""
Return all the sequence between two dates
Args:
start_date : start date of the period we want to retrieve (format : day/month/year, example : 13/02/2022)
end_date : end date of the period we want to retrieve (same format as start_date)
Returns:
dict : Json object that represents the response of the API
"""
url = f"{self.host}full_sequences/get_sequences_for_period/"
headers = {"Authorization": f"Token {self.token}"}
try:
response = requests.get(
url, params={"start_date": start_date, "end_date": end_date}, headers=headers)
except (ConnectionError, ConnectionResetError, ProtocolError):
print(
f"Server {self.host} doesn't respond. The website might be down.")
exit(1)
return response.json()
def logout(self):
url = f"{self.host}logout/"
header = {"Authorization": f"Token {self.token}"}
try:
response = requests.get(url, headers=header)
except (ConnectionError, ConnectionResetError, ProtocolError):
print(
f"Server {self.host} doesn't respond. The website might be down.")
exit(1)
return response.content.decode("utf-8")
def delete_token(self):
if os.path.exists("TOKEN"):
os.remove("TOKEN")
print("Token deleted")
@click.group()
@click.option('--host', '-h', help='host name (example: http://localhost:8000 or http://pyros.omp.eu) without last slash')
@click.pass_context
def cli(ctx, host):
if not host:
host = None
ctx.obj = PyrosAPI(host)
@cli.command("get_sequences_for_period", help="Get sequences for a period of date (start/end)")
@click.argument("start_date")
@click.argument("end_date")
@click.pass_obj
def get_sequences_for_period(api, start_date: str, end_date: str):
response = api.get_sequences_for_period(start_date, end_date)
print(response)
@cli.command("logout", help="Logout")
@click.pass_obj
def logout(api):
response = api.logout()
if os.path.exists("TOKEN"):
os.remove("TOKEN")
print("Token deleted")
print(response)
@cli.command("submit_sequence", help="Submit a sequence file")
@click.argument("file_path")
@click.pass_obj
def submit_sequence(api, file_path):
response = api.submit_sequence_file(file_path)
print(response)
@cli.command("query_url", help="Query an url to retrieve data")
@click.argument("url")
@click.pass_obj
def query_url(api, url):
response = api.get_url(url)
print(response)
if __name__ == '__main__':
cli(obj={})