#!/usr/bin/env python3 from email import header import os import requests import sys import yaml import click import getpass class PyrosAPI: """ Request Pyros API with an PyrosUser account linked to the username and password """ BASE_PYROS_URL = "http://localhost:8000/api/" #BASE_PYROS_URL = "http://pyros.irap.omp.eu/api/" def __init__(self) -> None: """ Initialize PyrosAPI class by getting the authentification token required to use the API Args: username : Username of the user (mail adress) password : Password of the user """ self.get_token() def get_token(self): """ get the authentification token linked to the user account Args: username : Username of the user (mail adress) password : Password of the user """ if os.path.exists("TOKEN"): with open("TOKEN", "r") as token_file: self.token = token_file.read() return else: url = f"{self.BASE_PYROS_URL}api-token-auth/" print("Token needs to be generated, you need to authentificate to PyROS :") username = input("Enter your username: ") try: password = getpass.getpass() except Exception as error: print('ERROR', error) request = requests.post( url, data={"username": username, "password": password}) if request.status_code != 200: print("Authentification failed, please retry") exit(1) json_response = request.json() self.token = json_response["token"] with open("TOKEN", "w+") as token_file: token_file.write(self.token) def get_url(self, url: str) -> dict: """ Query the url and return the response as json Args: url : Url to be requested without the base url of the website Returns: dict : Json object that represents the response of the API """ headers = {"Authorization": f"Token {self.token}"} request = requests.get(self.BASE_PYROS_URL+url, headers=headers) return request.json() def submit_sequence_file(self, file: str) -> dict: """ Submit sequence file by querying the API via PUT method Args: file : File path to the sequence file to be uploaded Returns: dict : Json object that represents the response of the API """ headers = { "Authorization": f"Token {self.token}", 'Content-type': 'application/json', } url = f"{self.BASE_PYROS_URL}submit_sequence" yaml_file = open(file, "r") sequence_file_as_dict = yaml.safe_load(yaml_file) print(f"File content : {sequence_file_as_dict}") request = requests.put(url, headers=headers, json=sequence_file_as_dict) print(f"Request status code {request.status_code}") return request.json() def get_sequences_for_period(self, start_date: str, end_date: str) -> dict: """ Return all the sequence between two dates Args: start_date : start date of the period we want to retrieve (format : day/month/year, example : 13/02/2022) end_date : end date of the period we want to retrieve (same format as start_date) Returns: dict : Json object that represents the response of the API """ url = f"{self.BASE_PYROS_URL}full_sequences/get_sequences_for_period/" headers = {"Authorization": f"Token {self.token}"} response = requests.get( url, params={"start_date": start_date, "end_date": end_date}, headers=headers) return response.json() def logout(self): url = f"{self.BASE_PYROS_URL}logout/" header = {"Authorization": f"Token {self.token}"} response = requests.get(url, headers=header) return response.content.decode("utf-8") @click.group() @click.pass_context def cli(ctx): ctx.obj = PyrosAPI() @cli.command("get_sequences_for_period", help="Get sequences for a period of date (start/end)") @click.argument("start_date") @click.argument("end_date") @click.pass_obj def get_sequences_for_period(api, start_date: str, end_date: str): response = api.get_sequences_for_period(start_date, end_date) print(response) @cli.command("logout", help="Logout") @click.pass_obj def logout(api): response = api.logout() if os.path.exists("TOKEN"): os.remove("TOKEN") print("Token deleted") print(response) @cli.command("submit_sequence", help="Submit a sequence file") @click.argument("file_path") @click.pass_obj def submit_sequence(api, file_path): response = api.submit_sequence_file(file_path) print(response) @cli.command("query_url", help="Query an url to retrieve data") @click.argument("url") @click.pass_obj def query_url(api, url): response = api.get_url(url) print(response) if __name__ == '__main__': cli(obj={})