config_pyros.py
4.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import yaml
import os
import sys
import time
import pykwalify.core
from pykwalify.errors import SchemaError
sys.path.append(os.environ.get("PROJECT_ROOT_PATH", os.path.join(os.getcwd(),"../../../")))
from vendor.guitastro.src.guitastro import FileNames
class ConfigPyros:
def check_and_return_config(self, yaml_file: str, schema_file: str) -> dict:
"""
Check if yaml_file is valid for the schema_file and return an dictionary of the config file
Args:
yaml_file (str): Path to the config_file to be validated
schema_file (str): Path to the schema file
Returns:
dict: dictionary of the config file (with values)
"""
# disable pykwalify error to clean the output
# logging.disable(logging.ERROR)
try:
can_yaml_file_be_read = False
while can_yaml_file_be_read != True:
if os.access(yaml_file, os.R_OK):
can_yaml_file_be_read = True
else:
print(
f"{yaml_file} can't be accessed, waiting for availability")
time.sleep(0.5)
c = pykwalify.core.Core(source_file=yaml_file, schema_files=[
os.path.join(self.config_path, schema_file)])
return c.validate(raise_exception=True)
except SchemaError:
for error in c.errors:
print("Error :", str(error).split(". Path")[0])
print("Path to error :", error.path)
return None
except IOError:
print("Error when reading the observatory config file")
def read_and_check_config_file(self, yaml_file: str) -> dict:
"""
Read the schema key of the config file to retrieve schema name and proceed to the checking of that config file
Call check_and_return_config function and print its return.
Args:
yaml_file (str): path to the config file
Returns:
dict: Dictionary of the config file (with values)
"""
try:
can_config_file_be_read = False
while can_config_file_be_read != True:
if os.access(yaml_file, os.R_OK):
can_config_file_be_read = True
else:
print(
f"{yaml_file} can't be accessed, waiting for availability")
time.sleep(0.5)
with open(yaml_file, 'r') as stream:
print(f"Reading {yaml_file}")
config_file = yaml.safe_load(stream)
result = self.check_and_return_config(
yaml_file, config_file["schema"])
if result == None:
print(
"Error when reading and validating config file, please check the errors right above")
exit(1)
return result
except yaml.YAMLError as exc:
print(exc)
except Exception as e:
print(e)
return None
def __init__(self, pyros_config_file):
self.config_path = os.path.join(
os.environ.get("PROJECT_ROOT_PATH", "../../../") , "config/")
if pyros_config_file is None:
pyros_config_file = "config_pyros_default.yml"
self.pyros_config = self.read_and_check_config_file(pyros_config_file)
fn = FileNames()
contexts = self.pyros_config["general"]["fn_contexts"]
for context, values in contexts.items():
fn.fcontext_create(context, values["description"])
fn.fcontext = context
if values.get("naming"):
fn.naming(values["naming"])
elif values.get("pathnaming"):
fn.pathnaming(values["pathnaming"])
root_project_path = os.environ.get("PROJECT_ROOT_PATH")
fn.rootdir = os.path.join(root_project_path, values["root_dir"])
fn.extension = values["extension"]
self.fn = fn
def main():
os.environ["DJANGO_PATH"] = "../../src/core/pyros_django"
os.environ["PROJECT_ROOT_PATH"] = "../../../../"
cp = ConfigPyros(os.path.join(os.environ["PROJECT_ROOT_PATH"], "config", "config_pyros_default.yml"))
cp.fn.fcontext = "pyros_seq"
print(cp.fn)
# file_fullname = "../../data/sequences_pickle/P1/20230319/16.p"
# param = cp.fn.naming_get(file_fullname)
# print(param)
# cp.fn.fname = cp.fn.naming_set(param)
# print(cp.fn.fname)
# #cp.fn.subdir = cp.fn.subdir_from_filename(file_fullname)
# print(type(cp.fn.fname))
# out_fullname = cp.fn.join()
# print(os.getcwd())
# print(out_fullname)
if __name__ == "__main__":
main()