get_speasy_invetories.py
5.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import json
import xml.etree.ElementTree as ET
from xml.dom import minidom
def create_element_with_id(tag, name, uid, parent_uid=None):
"""Create an XML element with a specific id and name."""
if parent_uid:
uid = f"{parent_uid}:{uid}"
elem = ET.Element(tag)
elem.set('xml:id', f"SPEASY_CDAWEB:{uid}")
elem.set('name', name)
return elem
def parse_dimensions(shape):
dim1 = dim2 = None
if shape.isdigit():
dim1 = int(shape)
elif shape.startswith("(") and shape.endswith(",)"):
dim1 = int(shape[1:-2])
elif shape.startswith("(") and shape.endswith(")"):
numbers = shape[1:-1].split(",")
dim1 = int(numbers[0].strip())
dim2 = int(numbers[1].strip())
return dim1, dim2
def getParamInfo(parameter, shape):
"""Get dims and type info from the json to the XML element."""
dim1, dim2 = parse_dimensions(shape)
if not dim1 == None:
parameter.set('dim1', str(dim1))
if dim2 == None:
parameter.set('dim2', "1")
else:
parameter.set('dim2', str(dim2))
parameter.set('type', 'double')
parameter.set('minSampling', '4')
def count_levels(data):
"""Recursively count the levels of nested dictionaries."""
if isinstance(data, dict):
return 1 + max((count_levels(v) for v in data.values()), default=0)
return 0
def get_mission_levels(json_obj):
"""Create a dictionary with mission names as keys and nested levels as values."""
levels_dict = {}
for mission_key, mission_data in json_obj.items():
if isinstance(mission_data, dict):
levels_dict[mission_key] = count_levels(mission_data)
return levels_dict
def add_instrument(observatory, observatory_data, mission_observatory_key):
"""Add the instrument, dataset and parameter to an observatory or a mission."""
for instrument_key, instrument_data in observatory_data.items():
if isinstance(instrument_data, dict) and '__spz_name__' in instrument_data:
instrument = create_element_with_id('instrument', instrument_data.get('__spz_name__', ''), instrument_key, mission_observatory_key)
observatory.append(instrument)
for dataset_key, dataset_data in instrument_data.items():
if isinstance(dataset_data, dict) and '__spz_name__' in dataset_data:
dataset = create_element_with_id('dataset', dataset_data.get('__spz_name__', ''), dataset_key, f"{mission_observatory_key}:{instrument_key}")
instrument.append(dataset)
for param_key, param_data in dataset_data.items():
if isinstance(param_data, dict) and param_data.get('__spz_type__') == 'ParameterIndex':
parameter = create_element_with_id('parameter', param_data.get('__spz_name__', ''), param_key, f"{mission_observatory_key}:{instrument_key}:{dataset_key}")
getParamInfo(parameter, param_data.get('spz_shape', ''))
dataset.append(parameter)
def json_to_custom_xml(json_obj, levels):
"""Convert JSON object to custom XML format."""
dataRoot = ET.Element('dataRoot')
dataRoot.set('xml:id', "myRemoteData-treeRootNode")
dataCenter = ET.Element('dataCenter')
dataCenter.set('xml:id', "SPEASY_CDAWEB")
dataCenter.set('name', "SPEASY_CDAWEB")
dataRoot.append(dataCenter)
for mission_key, mission_data in json_obj.items():
if isinstance(mission_data, dict):
mission = create_element_with_id('mission', mission_data.get('__spz_name__', ''), mission_key)
dataCenter.append(mission)
if(levels[mission_key] == 4):
observatory_data = mission_data
observatory = mission
mission_observatory_key = mission_key
add_instrument(observatory, observatory_data, mission_observatory_key)
else:
for observatory_key, data in mission_data.items():
if isinstance(data, dict):
observatory = create_element_with_id('observatory', data.get('__spz_name__', ''), observatory_key, mission_key)
observatory_data = data
mission_observatory_key = f"{mission_key}:{observatory_key}"
mission.append(observatory)
add_instrument(observatory, observatory_data, mission_observatory_key)
break
return ET.tostring(dataRoot, encoding='unicode')
def pretty_print_xml(xml_str):
"""Pretty print the XML string."""
parsed_xml = minidom.parseString(xml_str)
return parsed_xml.toprettyxml(indent=" ")
def convert_json_file_to_xml(json_file_path, xml_file_path):
"""Reads a JSON file and writes its content as pretty-printed XML to another file."""
with open(json_file_path, 'r') as json_file:
json_data = json.load(json_file)
levels_dict = get_mission_levels(json_data)
xml_data = json_to_custom_xml(json_data, levels_dict)
pretty_xml = pretty_print_xml(xml_data)
with open(xml_file_path, 'w') as xml_file:
xml_file.write(pretty_xml)
convert_json_file_to_xml('data.json', 'base.xml')