config_pyros.py
4.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import yaml
import os
import sys
import time
import pykwalify.core
from pykwalify.errors import SchemaError
sys.path.append(os.environ.get("PROJECT_ROOT_PATH", os.path.join(os.getcwd(),"../../../")))
from vendor.guitastro.src.guitastro import FileNames
class ConfigPyros:
def check_and_return_config(self, yaml_file: str, schema_file: str) -> dict:
"""
Check if yaml_file is valid for the schema_file and return an dictionary of the config file
Args:
yaml_file (str): Path to the config_file to be validated
schema_file (str): Path to the schema file
Returns:
dict: dictionary of the config file (with values)
"""
# disable pykwalify error to clean the output
# logging.disable(logging.ERROR)
try:
can_yaml_file_be_read = False
while can_yaml_file_be_read != True:
if os.access(yaml_file, os.R_OK):
can_yaml_file_be_read = True
else:
print(
f"{yaml_file} can't be accessed, waiting for availability")
time.sleep(0.5)
c = pykwalify.core.Core(source_file=yaml_file, schema_files=[
os.path.join(self.config_path, schema_file)])
return c.validate(raise_exception=True)
except SchemaError:
for error in c.errors:
print("Error :", str(error).split(". Path")[0])
print("Path to error :", error.path)
return None
except IOError:
print("Error when reading the observatory config file")
def read_and_check_config_file(self, yaml_file: str) -> dict:
"""
Read the schema key of the config file to retrieve schema name and proceed to the checking of that config file
Call check_and_return_config function and print its return.
Args:
yaml_file (str): path to the config file
Returns:
dict: Dictionary of the config file (with values)
"""
try:
can_config_file_be_read = False
while can_config_file_be_read != True:
if os.access(yaml_file, os.R_OK):
can_config_file_be_read = True
else:
print(
f"{yaml_file} can't be accessed, waiting for availability")
time.sleep(0.5)
with open(yaml_file, 'r') as stream:
print(f"Reading {yaml_file}")
config_file = yaml.safe_load(stream)
result = self.check_and_return_config(
yaml_file, config_file["schema"])
if result == None:
print(
"Error when reading and validating config file, please check the errors right above")
exit(1)
return result
except yaml.YAMLError as exc:
print(exc)
except Exception as e:
print(e)
return None
def __init__(self, pyros_config_file):
self.config_path = os.path.join(
os.environ.get("PROJECT_ROOT_PATH", "../../../") , "config/")
if pyros_config_file is None:
pyros_config_file = "config_pyros_default.yml"
self.pyros_config = self.read_and_check_config_file(pyros_config_file)
def main():
os.environ["DJANGO_PATH"] = "../../src/core/pyros_django"
os.environ["PROJECT_ROOT_PATH"] = "../../../../"
cp = ConfigPyros(os.path.join(os.environ["PROJECT_ROOT_PATH"], "config", "config_pyros_default.yml"))
cp.fn.fcontext = "pyros_seq"
print(cp.fn)
# file_fullname = "../../data/sequences_pickle/P1/20230319/16.p"
# param = cp.fn.naming_get(file_fullname)
# print(param)
# cp.fn.fname = cp.fn.naming_set(param)
# print(cp.fn.fname)
# #cp.fn.subdir = cp.fn.subdir_from_filename(file_fullname)
# print(type(cp.fn.fname))
# out_fullname = cp.fn.join()
# print(os.getcwd())
# print(out_fullname)
if __name__ == "__main__":
main()