#!/usr/bin/env python # -*- coding: utf-8 -* # Resources: # - http://pythonhosted.org/SpacePy/pycdf.html # - http://unidata.github.io/netcdf4-python/ import os import os.path as op from datetime import datetime from datetime import timedelta os.environ['SPACEPY'] = '/tmp' import netCDF4 from spacepy import pycdf TEMP_FILE = '/tmp' RESOLVER_URL = 'http://apus.irap.omp.eu:8080/amda-registry/resolver' def error(error_message, e=None): """ Display an `error_message` and exit the program. """ import sys with open(op.join(TEMP_FILE, 'converter.log'), 'a') as f_log: f_log.write('With arguments ' + ', '.join(sys.argv)) f_log.write(error_message) sys.stderr.write(error_message + ((':\n %s\n' % e) if e else '\n')) sys.exit(1) class NetCdf: SPASE_DATE_FORMAT = '%Y%j%H%M%S' # ex: 2016238000000 def __init__(self, input_path): """Constructor. Load a NetCDF file from `self.netcdf_path`, return a NetCDF4 object.""" self.cdf_path = None self.cdf = None self.temp_nc = None extension = input_path.split('/')[-1].split('.', 1)[1] if extension == 'nc.gz': self.unzip(input_path) elif extension == 'nc': self.netcdf_path = input_path else: error('Unknown file extension "%s".' % extension) try: self.netcdf = netCDF4.Dataset(self.netcdf_path) except Exception as e: error('Can not open netCDF file %s' % self.netcdf_path, e) if self.temp_nc: os.remove(self.temp_nc) def unzip(self, ncgz_input_path): import gzip from shutil import copyfileobj netcdf_temp_path = self.build_temp_path(ncgz_input_path, '.nc') if not op.exists(ncgz_input_path): error('Compressed Net-CDF file is not found in "%s".' % ncgz_input_path) try: with gzip.open(ncgz_input_path, 'rb') as f_in, open(netcdf_temp_path, 'wb') as f_out: copyfileobj(f_in, f_out) except Exception as e: error('Can not unzip compressed Net-CDF from %s to %s' % (ncgz_input_path, netcdf_temp_path), e) self.temp_nc = netcdf_temp_path self.netcdf_path = netcdf_temp_path def parse_date(self, str_date): """ Parse the string `str_date` and return the date.""" return datetime.strptime(str_date[:4] + str(int(str_date[4:7]) + 1) + str_date[7:13], self.SPASE_DATE_FORMAT) def parse_doubledate(self, ddate): """ Parse the double `ddate` and return the date.""" ddt = datetime.utcfromtimestamp(ddate) return ddt # NetCDF methods def get_netcdf_path(self): """Return the NetCDF path.""" return self.netcdf_path def get_netcdf(self): """Return the NetCDF object.""" return self.netcdf def describe(self): """Display all NetCDF variables of the NetCF file""" def describe_var(var): """Describe an net-cdf variable.""" print('== %s ==' % var.name) print(' - numpy type: %s ^ %s ' % (str(var.dtype), str(var.ndim))) print(' - dimension(s): %s' % ', '.join(list(var.dimensions))) print(' - size: %s = %d' % ('x'.join([str(n) for n in var.shape]), var.size)) if var.ndim == 1 and str(var.dtype) == '|S1': print(' - values: \'%s\', ...' % ''.join([c.decode('utf-8') for c in var[:]])) if var.ncattrs(): print(' - Attributes:') for var_attr_name in var.ncattrs(): print(' - %s: %s' % (var_attr_name, getattr(var, var_attr_name))) for (key, value) in self.netcdf.variables.items(): describe_var(value) print('== Global attributes ==') for global_attr_name in self.netcdf.ncattrs(): print(' - %s: %s' % (global_attr_name, getattr(self.netcdf, global_attr_name))) # CDF methods @staticmethod def create_new_cdf(cdf_path): """ Create a new empty CDF file in `self.cdf_path`, return a pyCDF object. """ # Create and clean a new directory for the CDF file. cdf_dir = op.dirname(cdf_path) if not op.exists(cdf_dir): try: os.makedirs(cdf_dir) except IOError as e: error('Can not create directory %s' % cdf_dir, e) if op.exists(cdf_path): try: os.remove(cdf_path) except IOError as e: error('A CDF file already exist in %s and it can not be removed' % cdf_path, e) # Make the pyCDF object try: cdf = pycdf.CDF(cdf_path, '') except pycdf.CDFError as e: error('Can not create CDF file on %s, check that the directory exists and its writing access' % cdf_path, e) # TODO Get spase file # get_spase() cdf.attrs['Acknowledgement '] = 'John Doe' return cdf def get_spase(self): pass @staticmethod def build_temp_path(path, extension): return op.abspath(op.join(TEMP_FILE, op.basename(path).split('.')[0] + extension)) def get_cdf(self, cdf_path=None): """ Convert and return the CDF object (only return it if it's already converted) - `cdf_path`: The path of the CDF file (needed for CDF creation), or a temp path if not specified. """ if self.cdf: return self.cdf self.cdf_path = op.abspath(cdf_path) if cdf_path else self.build_temp_path(self.netcdf_path, '.cdf') self.cdf = self.create_new_cdf(self.cdf_path) for global_attr_name in self.netcdf.ncattrs(): self.cdf.attrs[global_attr_name] = getattr(self.netcdf, global_attr_name) for key, var in self.netcdf.variables.items(): dimensions = list(var.dimensions) if str(var.dtype) == '|S1': if len(dimensions) == 1: var_str = str(netCDF4.chartostring(var[:]).astype(str, copy=False)) self.cdf.attrs[key] = self.parse_date(var_str) if dimensions[0] == 'TimeLength' else var_str elif len(dimensions) == 2: var_strs = map(str, netCDF4.chartostring(var[:]).astype(str, copy=False)) self.cdf[key] = list(map(self.parse_date, var_strs) if dimensions[1] == 'TimeLength' else var_strs) elif str(var.dtype) == 'float64' and key == 'Time': ## Time in Double self.cdf[key] = list(map(self.parse_doubledate, var[:])) else: self.cdf[key] = var[:] for var_attr_name in var.ncattrs(): self.cdf[key].attrs[var_attr_name] = getattr(var, var_attr_name) return self.cdf def get_cdf_path(self): return self.cdf_path if __name__ == '__main__': import sys if len(sys.argv) not in (2, 3): print('usage:') print('- `%s path/to/file.nc.gz` # Convert a Net-CDF file, ' 'save it in a temp directory, then display its path.' % sys.argv[0]) print('- `%s -i path/to/file.nc.gz` # Display information about a Net-CDF file.' % sys.argv[0]) print('- `%s path/to/file.nc.gz path/to/file.cdf` # Convert a Net-CDF file.' 'and save it in the specified path.' % sys.argv[0]) print('This script can also be used as a Python library.') exit(1) if len(sys.argv) == 2: netcdf = NetCdf(sys.argv[1]) netcdf.get_cdf() print('File stored in "%s".' % netcdf.get_cdf_path()) elif len(sys.argv) == 3: if sys.argv[1] == '-i': netcdf = NetCdf(sys.argv[2]) netcdf.describe() else: netcdf = NetCdf(sys.argv[1]) netcdf.get_cdf(sys.argv[2])