pyros_api.py 6.38 KB
#!/usr/bin/env python3
import os
from urllib3.exceptions import ProtocolError
from requests.exceptions import ConnectionError
import requests
import sys
import yaml
import click
import getpass


class PyrosAPI:
    """
    Request Pyros API with an PyrosUser account linked to the username and password
    """
    #host = "http://pyros.irap.omp.eu/api/"

    def __init__(self, host) -> None:
        """
        Initialize PyrosAPI class by getting the authentification token required to use the API

        Args:
            username : Username of the user (mail adress)
            password : Password of the user
        """
        if host == None:
            self.host = "http://localhost:8000"
        else:
            self.host = host
        self.host = self.host+"/api/"
        self.get_token()

    def get_token(self):
        """
        get the authentification token linked to the user account

        Args:
            username : Username of the user (mail adress)
            password : Password of the user
        """
        if os.path.exists("TOKEN"):
            with open("TOKEN", "r") as token_file:
                self.token = token_file.read()
                return
        else:
            url = f"{self.host}api-token-auth/"
            print("Token needs to be generated, you need to authentificate to PyROS :")
            username = input("Enter your username: ")
            try:
                password = getpass.getpass()
            except Exception as error:
                print('ERROR', error)
            try:
                request = requests.post(
                    url, data={"username": username, "password": password})
            except (ConnectionError, ConnectionResetError, ProtocolError):
                print(
                    f"Server {self.host} doesn't respond. The website might be down.")
                exit(1)
            if request.status_code != 200:
                print("Authentification failed, please retry")
                exit(1)
            json_response = request.json()
            self.token = json_response["token"]
            with open("TOKEN", "w+") as token_file:
                token_file.write(self.token)

    def get_url(self, url: str) -> dict:
        """
        Query the url and return the response as json

        Args:
            url : Url to be requested without the base url of the website 

        Returns:
            dict : Json object that represents the response of the API
        """
        headers = {"Authorization": f"Token {self.token}"}
        try:
            request = requests.get(self.host+url, headers=headers)
        except (ConnectionError, ConnectionResetError, ProtocolError):
            print(
                f"Server {self.host} doesn't respond. The website might be down.")
            exit(1)
        return request.json()

    def submit_sequence_file(self, file: str) -> dict:
        """
        Submit sequence file by querying the API via PUT method

        Args:
            file : File path to the sequence file to be uploaded

        Returns:
            dict : Json object that represents the response of the API
        """
        headers = {
            "Authorization": f"Token {self.token}",
            'Content-type': 'application/json',
        }
        url = f"{self.host}submit_sequence"
        yaml_file = open(file, "r")
        sequence_file_as_dict = yaml.safe_load(yaml_file)
        print(f"File content : {sequence_file_as_dict}")
        try:
            request = requests.put(url, headers=headers,
                                   json=sequence_file_as_dict)
        except (ConnectionError, ConnectionResetError, ProtocolError):
            print(
                f"Server {self.host} doesn't respond. The website might be down.")
            exit(1)
        print(f"Request status code {request.status_code}")
        return request.json()

    def get_sequences_for_period(self, start_date: str, end_date: str) -> dict:
        """
        Return all the sequence between two dates
        Args:
            start_date : start date of the period we want to retrieve (format : day/month/year, example : 13/02/2022)
            end_date : end date of the period we want to retrieve (same format as start_date)
        Returns:
            dict : Json object that represents the response of the API
        """
        url = f"{self.host}full_sequences/get_sequences_for_period/"
        headers = {"Authorization": f"Token {self.token}"}
        try:
            response = requests.get(
                url, params={"start_date": start_date, "end_date": end_date}, headers=headers)
        except (ConnectionError, ConnectionResetError, ProtocolError):
            print(
                f"Server {self.host} doesn't respond. The website might be down.")
            exit(1)
        return response.json()

    def logout(self):
        url = f"{self.host}logout/"
        header = {"Authorization": f"Token {self.token}"}
        try:
            response = requests.get(url, headers=header)
        except (ConnectionError, ConnectionResetError, ProtocolError):
            print(
                f"Server {self.host} doesn't respond. The website might be down.")
            exit(1)
        return response.content.decode("utf-8")


@click.group()
@click.option('--host', '-h', help='host name (example: http://localhost:8000 or http://pyros.omp.eu) without last slash')
@click.pass_context
def cli(ctx, host):
    if not host:
        host = None
    ctx.obj = PyrosAPI(host)


@cli.command("get_sequences_for_period", help="Get sequences for a period of date (start/end)")
@click.argument("start_date")
@click.argument("end_date")
@click.pass_obj
def get_sequences_for_period(api, start_date: str, end_date: str):
    response = api.get_sequences_for_period(start_date, end_date)
    print(response)


@cli.command("logout", help="Logout")
@click.pass_obj
def logout(api):
    response = api.logout()
    if os.path.exists("TOKEN"):
        os.remove("TOKEN")
    print("Token deleted")
    print(response)


@cli.command("submit_sequence", help="Submit a sequence file")
@click.argument("file_path")
@click.pass_obj
def submit_sequence(api, file_path):
    response = api.submit_sequence_file(file_path)
    print(response)


@cli.command("query_url", help="Query an url to retrieve data")
@click.argument("url")
@click.pass_obj
def query_url(api, url):
    response = api.get_url(url)
    print(response)


if __name__ == '__main__':
    cli(obj={})