# coding=utf-8
from io import StringIO
import datetime
import gzip
import json
import logging
import random
import tarfile
import time
import urllib.request as urllib_request
import requests
import re # regex
from csv import writer as csv_writer, DictWriter as csv_dict_writer
from math import sqrt, isnan
from os import environ, remove as removefile
from os.path import isfile, join, abspath, dirname
# (i) Just ignore the requirements warning, we already require `dateutils`.
from dateutil.relativedelta import relativedelta
from dateutil import parser as dateparser
from flask import Flask
from flask import request
from flask import url_for, send_from_directory, abort as abort_flask
from jinja2 import Environment, FileSystemLoader
from jinja2.utils import markupsafe
from yaml import load as yaml_load, dump
from speasy import amda
import pandas as pd
try:
from yaml import CLoader as Loader
except ImportError:
from yaml import Loader
from netCDF4 import Dataset, date2num
# PATH RELATIVITY #############################################################
THIS_DIRECTORY = dirname(abspath(__file__))
def get_path(relative_path):
"""Get an absolute path from the relative path to this script directory."""
return abspath(join(THIS_DIRECTORY, relative_path))
# COLLECT GLOBAL INFORMATION FROM SOURCES #####################################
# VERSION
with open(get_path('../VERSION'), 'r') as version_file:
version = version_file.read().strip()
# CONFIG
with open(get_path('../config.yml'), 'r', encoding='utf8') as config_file:
config = yaml_load(config_file.read(), Loader=Loader)
FILE_DATE_FMT = "%Y-%m-%dT%H:%M:%S"
MOMENT_DATE_FMT = "%Y-%m-%dT%H:%M:%SZ"
CME_DATE_FMT = "%Y-%m-%dT%H:%MZ"
# Are we on the SSA instance for ESA?
SSA = environ.get('SSA') == 'true'
DEBUG = environ.get('DEBUG') in ['true', 'True', 1, '1']
# LOGGING #####################################################################
LOG_FILE = get_path('run.log')
hp_logger = logging.getLogger("HelioPropa")
hp_logger.setLevel(logging.DEBUG)
fileHandler = logging.FileHandler(LOG_FILE)
fileHandler.setFormatter(logging.Formatter(
"%(asctime)s - %(levelname)s - %(message)s"
))
# set file messages depending on env var setting
if DEBUG:
fileHandler.setLevel(logging.DEBUG)
else:
fileHandler.setLevel(logging.ERROR)
hp_logger.addHandler(fileHandler)
def init_console_logger():
consoleHandler = logging.StreamHandler()
if DEBUG:
consoleHandler.setLevel(logging.DEBUG)
else:
consoleHandler.setLevel(logging.ERROR)
formatter = logging.Formatter('CONSOLE: %(levelname)s - %(message)s')
consoleHandler.setFormatter(formatter)
# Add handlers to the logger
hp_logger.addHandler(consoleHandler)
return hp_logger
# HARDCODED CONFIGURATION #####################################################
ASTRONOMICAL_UNIT_IN_KM = 1.496e8
# Absolute path to the installed CDF library from https://cdf.gsfc.nasa.gov/
CDF_LIB = '/usr/local/lib/libcdf'
# Absolute path to the data cache directory
CACHE_DIR = get_path('../cache')
# These two configs are not in the YAML config because adding a new parameter
# will not work as-is, you'd have to edit some netcdf-related code.
# The slugs of the available parameters in the generated CSV files.
# The order matters. If you change this you also need to change the
# innermost loop of `get_data_for_target`.
# The javascript knows the targets' properties under these names.
# PROPERTIES = ('time', 'vrad', 'vtan', 'vtot', 'btan', 'brad', 'temp', 'pdyn', 'dens',
# 'atse', 'xhee', 'yhee')
PROPERTIES = ('time', 'vrad', 'vtan', 'vtot', 'btan', 'temp', 'pdyn', 'dens',
'atse', 'xhee', 'yhee')
# The parameters that the users can handle.
# The slug MUST be one of the properties above.
PARAMETERS = {
'pdyn': {
'slug': 'pdyn',
'name': 'Dyn. Pressure',
'title': 'The dynamic pressure.',
'units': 'nPa',
'active': True,
'position': 10,
},
'vtot': {
'slug': 'vtot',
'name': 'Velocity',
'title': 'The velocity of the particles.',
'units': 'km/s',
'active': False,
'position': 20,
},
'btan': {
'slug': 'btan',
'name': 'B Tangential',
'title': 'B Tangential.',
'units': 'nT',
'active': False,
'position': 30,
},
# 'brad': {
# 'slug': 'brad',
# 'name': 'B Radial',
# 'title': 'B Radial.',
# 'units': 'nT',
# 'active': False,
# 'position': 35,
# },
'temp': {
'slug': 'temp',
'name': 'Temperature',
'title': 'The temperature.',
'units': 'eV',
'active': False,
'position': 40,
},
'dens': {
'slug': 'dens',
'name': 'Density',
'title': 'The density N.',
'units': 'cm^-3',
'active': False,
'position': 50,
},
'atse': {
'slug': 'atse',
'name': 'Angle T-S-E',
'title': 'Angle Target-Sun-Earth.',
'units': 'deg',
'active': False,
'position': 60,
},
}
# SETUP ENVIRONMENT ###########################################################
environ['SPACEPY'] = CACHE_DIR
environ['CDF_LIB'] = CDF_LIB
# SETUP FLASK ENGINE ##########################################################
app = Flask(__name__, root_path=THIS_DIRECTORY)
app.debug = DEBUG
if app.debug:
hp_logger.info("Starting Flask app IN DEBUG MODE...")
else:
hp_logger.info("Starting Flask app...")
def handle_error(e):
hp_logger.error(e)
return str(e) # wish we could use the default error renderer here
app.register_error_handler(Exception, handle_error)
# SETUP JINJA2 TEMPLATE ENGINE ################################################
def static_global(filename):
return url_for('static', filename=filename)
def shuffle_filter(seq):
"""
This shuffles the sequence it is applied to.
Jinja2 _should_ provide this.
"""
try:
result = list(seq)
random.shuffle(result)
return result
except:
return seq
def markdown_filter(value, nl2br=False, p=True):
"""
Converts markdown into html.
nl2br: set to True to replace line breaks with
tags
p: set to False to remove the enclosing
", "").replace(r"
", "") return markdowned _js_escapes = { '\\': '\\u005C', '\'': '\\u0027', '"': '\\u0022', '>': '\\u003E', '<': '\\u003C', '&': '\\u0026', '=': '\\u003D', '-': '\\u002D', ';': '\\u003B', u'\u2028': '\\u2028', u'\u2029': '\\u2029' } # Escape every ASCII character with a value less than 32. _js_escapes.update(('%c' % z, '\\u%04X' % z) for z in range(32)) def escapejs_filter(value): escaped = [] for letter in value: if letter in _js_escapes: escaped.append(_js_escapes[letter]) else: escaped.append(letter) return markupsafe.Markup("".join(escaped)) tpl_engine = Environment(loader=FileSystemLoader([get_path('view')]), trim_blocks=True, lstrip_blocks=True) tpl_engine.globals.update( url_for=url_for, static=static_global, ) tpl_engine.filters['markdown'] = markdown_filter tpl_engine.filters['md'] = markdown_filter tpl_engine.filters['shuffle'] = shuffle_filter tpl_engine.filters['escapejs'] = escapejs_filter tpl_global_vars = { 'request': request, 'version': version, 'config': config, 'now': datetime.datetime.now(), 'is_esa': SSA, } # HELPERS ##################################################################### def abort(code, message): hp_logger.error("Abort: " + message) abort_flask(code, message) def render_view(view, context=None): """ A simple helper to render [view] template with [context] vars. It automatically adds the global template vars defined above, too. It returns a string, usually the HTML contents to display. """ context = {} if context is None else context return tpl_engine.get_template(view).render( dict(list(tpl_global_vars.items()) + list(context.items())) ) # def render_page(page, title="My Page", context=None): # """ # A simple helper to render the md_page.html template with [context] vars & # the additional contents of `page/[page].md` in the `md_page` variable. # It automagically adds the global template vars defined above, too. # It returns a string, usually the HTML contents to display. # """ # if context is None: # context = {} # context['title'] = title # context['md_page'] = '' # with file(get_path('page/%s.md' % page)) as f: # context['md_page'] = f.read() # return tpl_engine.get_template('md_page.html').render( # dict(tpl_global_vars.items() + context.items()) # ) def is_list_in_list(needle, haystack): for n in needle: if n not in haystack: return False return True def round_time(dt=None, round_to=60): """ Round a datetime object to any time laps in seconds dt : datetime.datetime object, default now. roundTo : Closest number of seconds to round to, default 1 minute. """ if dt is None: dt = datetime.datetime.now() seconds = (dt.replace(tzinfo=None) - dt.min).seconds rounding = (seconds + round_to / 2) // round_to * round_to return dt + datetime.timedelta(0, rounding - seconds, -dt.microsecond) def datetime_from_list(time_list): """ Datetimes in retrieved CDFs are stored as lists of numbers, with DayOfYear starting at 0. We want it starting at 1 because it's what vendor parsers use, both in python and javascript. """ try: time_list = [str(i, 'UTF8') for i in time_list] except Exception as e: hp_logger.error(e) # Day Of Year starts at 0, but for our datetime parser it starts at 1 doy = '{:03d}'.format(int(''.join(time_list[4:7])) + 1) return datetime.datetime.strptime( "%s%s%s" % (''.join(time_list[0:4]), doy, ''.join(time_list[7:-1])), "%Y%j%H%M%S%f" ) # Override these using the model configuration in config.yml default_nc_keys = { 'hee': 'HEE', 'vtot': 'V', 'magn': 'B', 'temp': 'T', 'dens': 'N', 'pdyn': 'P_dyn', 'atse': 'Delta_angle', } def _sta_sto(_cnf, _sta, _sto): if 'started_at' in _cnf: _s0 = datetime.datetime.strptime(_cnf['started_at'], FILE_DATE_FMT) _s0 = max(_s0, _sta) else: _s0 = _sta if 'stopped_at' in _cnf: _s1 = datetime.datetime.strptime(_cnf['stopped_at'], FILE_DATE_FMT) _s1 = min(_s1, _sto) else: _s1 = _sto return _s0, _s1 def _read_var(_nc, _keys, _key, mandatory=False): try: return _nc.variables[_keys[_key]][:] except KeyError: pass if mandatory: raise Exception("No variable '%s' found in NetCDF." % _keys[_key]) return [None] * len(_nc.variables['Time']) # slow -- use numpy! def get_local_filename(url): """ Build the local cache filename for the distant file :param url: string :return: string """ from slugify import slugify n = len('http://') if url.startswith('https'): n += 1 s = url[n:] return slugify(s) def get_target_config(slug): for s in config['targets']: # dumb if s['slug'] == slug: return s raise Exception("No target found in configuration for '%s'." % slug) def check_target_config(slug): get_target_config(slug) def get_active_targets(): all_targets = config['targets'] return [t for t in all_targets if not ('locked' in t and t['locked'])] def validate_tap_target_config(target): tc = get_target_config(target) if 'tap' not in tc: raise Exception("No `tap` configuration for target `%s`." % target) if 'target_name' not in tc['tap']: raise Exception("No `target_name` in the `tap` configuration for target `%s`." % target) return tc # Using pyvo would be best. # def retrieve_auroral_emissions_vopy(target_name): # api_url = "http://voparis-tap.obspm.fr/__system__/tap/run/tap/sync" # import pyvo as vo # service = vo.dal.TAPService(api_url) # # … can't figure out how to install pyvo and spacepy alongside (forking?) def retrieve_auroral_emissions(target_name, d_started_at=None, d_stopped_at=None): """ Work In Progress. :param target_name: You should probably not let users define this value, as our sanitizing for ADQL may not be 100% safe. Use values from YAML configuration, instead. Below is a list of the ids we found to be existing. > SELECT DISTINCT target_name FROM apis.epn_core - Jupiter - Saturn - Mars - MERCURY - Titan - Io - VENUS - Ganymede - Uranus - Callisto - Europa https://pdssbn.astro.umd.edu/holdings/nh-p-rex-2-pluto-v1.0/document/codmac_level_definitions.pdf processing_level == 3 API doc and sandbox: http://voparis-tap-planeto.obspm.fr/__system__/tap/run/info :return: """ # Try out the form # http://voparis-tap-planeto.obspm.fr/__system__/adql/query/form api_url = "http://voparis-tap.obspm.fr/__system__/tap/run/tap/sync" if d_started_at is None: d_started_at = datetime.datetime.now() t_started_at = time.mktime(d_started_at.timetuple()) - 3600 * 24 * 365 * 2 # t_started_at = 1 else: t_started_at = time.mktime(d_started_at.timetuple()) if d_stopped_at is None: d_stopped_at = datetime.datetime.now() t_stopped_at = time.mktime(d_stopped_at.timetuple()) def timestamp_to_jday(timestamp): return timestamp / 86400.0 + 2440587.5 def jday_to_timestamp(jday): return (jday - 2440587.5) * 86400.0 def jday_to_datetime(jday): return datetime.datetime.utcfromtimestamp(jday_to_timestamp(jday)) # SELECT DISTINCT dataproduct_type FROM apis.epn_core # > im sp sc query = """ SELECT time_min, time_max, thumbnail_url, external_link FROM apis.epn_core WHERE target_name='{target_name}' AND dataproduct_type='im' AND time_min >= {jday_start} AND time_min <= {jday_stop} ORDER BY time_min, granule_gid """.format( target_name=target_name.replace("'", "\\'"), jday_start=timestamp_to_jday(t_started_at), jday_stop=timestamp_to_jday(t_stopped_at) ) # AND processing_level = 4 # query = """ # SELECT DISTINCT target_name FROM apis.epn_core # """ try: response = requests.post(api_url, { 'REQUEST': 'doQuery', 'LANG': 'ADQL', 'QUERY': query, 'TIMEOUT': '30', 'FORMAT': 'VOTable/td' }) response_xml = response.text import xml.etree.ElementTree as ET root = ET.fromstring(response_xml) namespaces = {'vo': 'http://www.ivoa.net/xml/VOTable/v1.3'} rows_xpath = "./vo:RESOURCE/vo:TABLE/vo:DATA/vo:TABLEDATA/vo:TR" rows = [] for row in root.findall(rows_xpath, namespaces): rows.append({ 'time_min': jday_to_datetime(float(row[0].text)), 'time_max': jday_to_datetime(float(row[1].text)), 'thumbnail_url': row[2].text, 'external_link': row[3].text, }) return rows except Exception as e: print("Failed to retrieve auroral emissions :") print(e) # print(query) def retrieve_amda_netcdf(orbiter, what, started_at, stopped_at): """ Handles remote querying AMDA's API for URLs, and then downloading, extracting and caching the netCDF files. :param orbiter: key of the source in the YAML config :param what: either 'model' or 'orbit', a key in the config of the source :param started_at: :param stopped_at: :return: a list of local file paths to netCDF (.nc) files """ url = config['amda'].format( dataSet=what, startTime=started_at.isoformat(), stopTime=stopped_at.isoformat() ) hp_logger.info("Fetching remote gzip files list at '%s'." % url) retries = 0 success = False errors = [] remote_gzip_files = [] while not success and retries < 3: try: response = urllib_request.urlopen(url) remote_gzip_files = json.loads(response.read()) if not remote_gzip_files: raise Exception("Failed to fetch data at '%s'." % url) if remote_gzip_files == 'NODATASET': raise Exception("API says there's no dataset at '%s'." % url) if remote_gzip_files == 'ERROR': raise Exception("API returned an error at '%s'." % url) if remote_gzip_files == ['OUTOFTIME']: # it happens return [] # raise Exception("API says it's out of time at '%s'." % url) success = True except Exception as e: hp_logger.warning("Failed (%d/3) '%s' : %s" % (retries + 1, url, e.message)) remote_gzip_files = [] errors.append(e) finally: retries += 1 if not remote_gzip_files: hp_logger.error("Failed to retrieve data from AMDA.") hp_logger.error("Failed to fetch gzip files list for %s at '%s' : %s" % (orbiter, url, errors)) abort(400, "Failed to fetch gzip files list for %s at '%s' : %s" % (orbiter, url, errors)) else: remote_gzip_files = list(set(remote_gzip_files)) hp_logger.debug("Fetched remote gzip files list : %s." % str(remote_gzip_files)) local_gzip_files = [] for remote_gzip_file in remote_gzip_files: # hotfixes to remove when fixed upstream @Myriam if remote_gzip_file in ['OUTOFTIME', 'ERROR']: continue # sometimes half the response is okay, the other not if remote_gzip_file.endswith('/.gz'): continue # this is just a plain bug remote_gzip_file = remote_gzip_file.replace('cdpp1', 'cdpp', 1) ################################################ local_gzip_file = join(CACHE_DIR, get_local_filename(remote_gzip_file)) local_gzip_files.append(local_gzip_file) if not isfile(local_gzip_file): hp_logger.debug("Retrieving '%s'..." % local_gzip_file) urllib_request.urlretrieve(remote_gzip_file, local_gzip_file) hp_logger.debug("Retrieved '%s'." % local_gzip_file) else: hp_logger.debug("Found '%s' in the cache." % local_gzip_file) local_netc_files = [] for local_gzip_file in local_gzip_files: local_netc_file = local_gzip_file[0:-3] hp_logger.debug("Unzipping '%s'..." % local_gzip_file) success = True try: with gzip.open(local_gzip_file) as f: file_content = f.read() with open(local_netc_file, 'w+b') as g: g.write(file_content) except Exception as e: success = False hp_logger.error("Cannot process gz file '%s' from '%s' : %s" % (local_gzip_file, url, e)) # Sometimes, the downloaded gz is corrupted, and CRC checks fail. # We want to delete the local gz file and try again next time. removefile(local_gzip_file) if success: local_netc_files.append(local_netc_file) hp_logger.debug("Unzipped '%s'." % local_gzip_file) return list(set(local_netc_files)) # remove possible dupes # class DataParser: # """ # Default data parser # A wip to try to handle code exeptions sanely. # """ # # # Override these using the model configuration # default_nc_keys = { # 'hee': 'HEE', # 'vtot': 'V', # 'magn': 'B', # 'temp': 'T', # 'dens': 'N', # 'pdyn': 'P_dyn', # 'atse': 'Delta_angle', # } # # def __init__(self, target, model): # self.target = target # self.model = model # pass # # def _read_var(self, nc, _keys, _key, mandatory=False): # try: # return nc.variables[_keys[_key]] # except KeyError: # pass # if mandatory: # raise Exception("No variable '%s' found in NetCDF." % _keys[_key]) # return [None] * len(nc.variables['Time']) # slow -- use numpy? # # def parse(self, cdf_handle): # nc_keys = self.default_nc_keys.copy() # # times = cdf_handle.variables['Time'] # YYYY DOY HH MM SS .ms # data_v = self._read_var(cdf_handle, nc_keys, 'vtot') # data_b = self._read_var(cdf_handle, nc_keys, 'magn') # data_t = self._read_var(cdf_handle, nc_keys, 'temp') # data_n = self._read_var(cdf_handle, nc_keys, 'dens') # data_p = self._read_var(cdf_handle, nc_keys, 'pdyn') # data_a = self._read_var(cdf_handle, nc_keys, 'atse') # # return zip() def get_data_for_target(target_config, input_slug, started_at, stopped_at): """ :return: dict whose keys are datetime as str, values tuples of data """ hp_logger.debug("Grabbing data for '%s'..." % target_config['slug']) try: models = target_config['models'][input_slug] except Exception as e: abort(500, "Invalid model configuration for '%s' : %s" % (target_config['slug'], str(e))) try: orbits = target_config['orbit']['models'] except KeyError as e: orbits = [] # abort(500, "Invalid orbit configuration for '%s' : %s" # % (target_config['slug'], str(e))) precision = "%Y-%m-%dT%H" # model and orbits times are only equal-ish orbit_data = {} # keys are datetime as str, values arrays of XY for orbit in orbits: s0, s1 = _sta_sto(orbit, started_at, stopped_at) nc_keys = default_nc_keys.copy() if 'parameters' in orbit: nc_keys.update(orbit['parameters']) orbit_files = retrieve_amda_netcdf( target_config['slug'], orbit['slug'], s0, s1 ) for orbit_file in orbit_files: hp_logger.debug("%s: opening orbit NETCDF4 '%s'..." % (target_config['name'], orbit_file)) cdf_handle = Dataset(orbit_file, "r", format="NETCDF4") times = cdf_handle.variables['Time'] # YYYY DOY HH MM SS .ms data_hee = _read_var(cdf_handle, nc_keys, 'hee', mandatory=True) hp_logger.debug("%s: aggregating data from '%s'..." % (target_config['name'], orbit_file)) for ltime, datum_hee in zip(times, data_hee): try: dtime = datetime_from_list(ltime) except Exception: hp_logger.error("Failed to parse time from get__data_for_target %s." % ltime) raise # Keep only what's in the interval if s0 <= dtime <= s1: dkey = round_time(dtime, 60 * 60).strftime(precision) orbit_data[dkey] = datum_hee cdf_handle.close() all_data = {} # keys are datetime as str, values tuples of data for model in models: s0, s1 = _sta_sto(model, started_at, stopped_at) model_files = retrieve_amda_netcdf( target_config['slug'], model['slug'], s0, s1 ) nc_keys = default_nc_keys.copy() if 'parameters' in model: nc_keys.update(model['parameters']) if len(model_files) == 0: hp_logger.warning("No model data for '%s' '%s'." % (target_config['slug'], model['slug'])) for model_file in model_files: hp_logger.debug("%s: opening model NETCDF4 '%s'..." % (target_config['name'], model_file)) cdf_handle = Dataset(model_file, "r", format="NETCDF4") # log.debug(cdf_handle.variables.keys()) times = cdf_handle.variables['Time'] # YYYY DOY HH MM SS .ms data_v = _read_var(cdf_handle, nc_keys, 'vtot') data_b = _read_var(cdf_handle, nc_keys, 'magn') data_t = _read_var(cdf_handle, nc_keys, 'temp') data_n = _read_var(cdf_handle, nc_keys, 'dens') data_p = _read_var(cdf_handle, nc_keys, 'pdyn') data_a = _read_var(cdf_handle, nc_keys, 'atse') # Usually: # Time, StartTime, StopTime, V, B, N, T, Delta_angle, P_dyn # Earth: # Time, BartelsNumber, ImfID, SwID, ImfPoints, # SwPoints, B_M_av, B_Vec_av, B_Theta_av, # B_Phi_av, B, T, N, V, Vlat, Vlon, # Alpha, RamP, E, Beta, Ma, Kp, R, DST, # AE, Flux, Flag, F10_Index, StartTime, StopTime # Don't ignore, but instead, compute a mean ? # Look for discretisation ? temporal drizzling ? 1D drizzling ? # The easy linear mean feels a bit rough too. Running mean too. # FIXME ignored_count = 0 hp_logger.debug("%s: aggregating data from '%s'..." % (target_config['name'], model_file)) for ltime, datum_v, datum_b, datum_t, datum_n, datum_p, datum_a \ in zip(times, data_v, data_b, data_t, data_n, data_p, data_a): try: dtime = datetime_from_list(ltime) except Exception: hp_logger.error("Failed to parse time from %s." % ltime) raise if not (s0 <= dtime <= s1): continue # Cull what's out of the interval droundtime = round_time(dtime, 60 * 60) dkey = droundtime.strftime(precision) x_hee = None y_hee = None if dkey in orbit_data: x_hee = orbit_data[dkey][0] / ASTRONOMICAL_UNIT_IN_KM y_hee = orbit_data[dkey][1] / ASTRONOMICAL_UNIT_IN_KM # First exception: V may be a vector instead of a scalar if hasattr(datum_v, '__len__'): vrad = datum_v[0] vtan = datum_v[1] vtot = sqrt(vrad * vrad + vtan * vtan) else: # eg: Earth vrad = None vtan = None vtot = datum_v # Second exception: Earth is always at (1, 0) if target_config['slug'] == 'earth': x_hee = 1 y_hee = 0 # Third exception: B is a Vector3 or a Vector2 for Earth if target_config['slug'] == 'earth': if model['slug'] == 'omni_hour_all': # Vector3 datum_b = datum_b[0] # if model['slug'] == 'ace_swepam_real': # Vector2 # datum_b = datum_b[0] if model['slug'] == 'ace_swepam_real_1h': datum_p = datum_n * vtot * vtot * 1.6726e-6 if vtot is None or isnan(vtot): continue # Fourth exception: Earth temp is in K, not eV # @nandre: à vérifier ! Where is la constante de Boltzmann ? if target_config['slug'] == 'earth' and datum_t: datum_t = datum_t / 11605.0 # Keep adding exceptions here until you can't or become mad # Ignore bad data # if numpy.isnan(datum_t): # continue # Crude/Bad drizzling condition : first found datum in the hour # gets to set the data. Improve this. if dkey not in all_data: # /!. The Set value MUST be in the same order as PROPERTIES all_data[dkey] = ( dtime.strftime("%Y-%m-%dT%H:%M:%S+00:00"), vrad, vtan, vtot, datum_b, datum_t, datum_p, datum_n, datum_a, x_hee, y_hee ) else: ignored_count += 1 # Improve this loop so as to remove this stinky debug log if ignored_count > 0: hp_logger.debug(" Ignored %d datum(s) during ~\"drizzling\"." % ignored_count) cdf_handle.close() return all_data def generate_csv_contents(target_slug, input_slug, started_at, stopped_at): target_config = get_target_config(target_slug) hp_logger.debug("Crunching CSV contents for '%s'..." % target_config['name']) si = StringIO() cw = csv_writer(si) cw.writerow(PROPERTIES) all_data = get_data_for_target( target_config=target_config, input_slug=input_slug, started_at=started_at, stopped_at=stopped_at ) hp_logger.debug("Writing and sorting CSV for '%s'..." % target_config['slug']) for dkey in sorted(all_data): cw.writerow(all_data[dkey]) hp_logger.info("Generated CSV contents for '%s'." % target_config['slug']) return si.getvalue() def generate_csv_contents_spz(target_slug, input_slug, started_at, stopped_at): now = datetime.datetime.now() hp_logger.debug( f"Generating csv content with Speasy for {target_slug}/{input_slug} between {stopped_at} {stopped_at}") target_config = get_target_config(target_slug) plasma_params = {} for plasma_struct in target_config['models'][input_slug]: plasma_params = {**plasma_params, **plasma_struct['parameters']} orbit_params = {} for orbit_struct in target_config['orbit']['models']: orbit_params = {**orbit_params, **orbit_struct['parameters']} parameters_dict = {**plasma_params, **orbit_params} hp_logger.info(f"Aggregating dataframes speazy parameters for '{input_slug}' to '{target_slug}'") list_df = [] for _name, _id in parameters_dict.items(): hp_logger.debug(f"Getting parameter id '{_id}' for '{_name}'") amda_dataset = amda.get_data(_id, started_at, stopped_at) if amda_dataset is None: continue _df = amda_dataset.to_dataframe() if _name == 'xy_v': _df = _df.rename(columns={_df.columns[0]: 'vrad', _df.columns[1]: 'vtan'}) elif _name == 'xy_hee': _df = _df.drop(_df.columns[2], axis=1) _df = _df.rename(columns={_df.columns[0]: 'xhee', _df.columns[1]: 'yhee'}) elif _name == 'xy_b': _df = _df.drop(_df.columns[2], axis=1) _df = _df.rename(columns={_df.columns[0]: 'brad', _df.columns[1]: 'btan'}) else: _df = _df.rename(columns={_df.columns[0]: _name}) # _df = _df[~_df.index.duplicated()] # resample to frequency, for later concatenation _df = _df.resample('1H').mean() # _df = _df.loc[_df.first_valid_index():_df.last_valid_index()] list_df.append(_df) from math import sqrt final_df = pd.concat(list_df, axis=1) # build vtot param if doesnt exist if 'vtot' not in final_df.columns \ and 'vtan' in final_df.columns \ and 'vrad' in final_df.columns: final_df['vtot'] = final_df.apply(lambda x: sqrt(x.vtan * x.vtan + x.vrad * x.vrad), axis=1) # Hardcoded earth coordinates if target_slug == 'earth': final_df['xhee'] = 1 final_df['yhee'] = 0 cols_ordered = ['vrad', 'vtan', 'vtot', 'btan', 'brad', 'temp', 'pdyn', 'dens', 'atse', 'xhee', 'yhee'] cols_ordered = [_c for _c in cols_ordered if _c in final_df.columns] final_df = final_df[cols_ordered] final_df.index.name = 'time' then = datetime.datetime.now() hp_logger.debug(f"Took {then - now} to generate") return final_df.to_csv(date_format='%Y-%m-%dT%H:%M:%S+00:00', float_format="%.5f", header=True, sep=",") def generate_csv_file_if_needed(target_slug, input_slug, started_at, stopped_at): filename = "%s_%s_%s_%s.csv" % (target_slug, input_slug, started_at.strftime(FILE_DATE_FMT), stopped_at.strftime(FILE_DATE_FMT)) local_csv_file = join(CACHE_DIR, filename) skip_generation = False # It needs to have more than one line to not be empty (headers) if isfile(local_csv_file): with open(local_csv_file) as f: skip_generation = len(f.readlines()) > 0 if skip_generation: hp_logger.debug(f"{local_csv_file} already exists") return None else: hp_logger.debug(f"{local_csv_file} doesnt exist or is empty") # temporary switch while migrating each target to spz if target_slug in ['rosetta', 'juno', 'p67']: csv_generator = generate_csv_contents else: csv_generator = generate_csv_contents_spz hp_logger.info("Generating CSV '%s'..." % local_csv_file) try: with open(local_csv_file, mode="w+") as f: f.write(csv_generator( target_slug=target_slug, input_slug=input_slug, started_at=started_at, stopped_at=stopped_at )) hp_logger.info("Generation of '%s' done." % filename) except Exception as e: from sys import exc_info from traceback import extract_tb exc_type, exc_value, exc_traceback = exc_info() hp_logger.error(e) for trace in extract_tb(exc_traceback): hp_logger.error(trace) if isfile(local_csv_file): hp_logger.warning("Removing failed CSV '%s'..." % local_csv_file) removefile(local_csv_file) abort(500, "Failed creating CSV '%s' : %s" % (filename, e)) def remove_all_files(in_directory): """ Will throw if something horrible happens. Does not remove recursively (could be done with os.walk if needed). Does not remove directories either. :param in_directory: absolute path to directory :return: """ import os if not os.path.isdir(in_directory): raise ValueError("No directory to clean at '%s'.") removed_files = [] for file_name in os.listdir(in_directory): file_path = os.path.join(in_directory, file_name) if os.path.isfile(file_path): os.remove(file_path) removed_files.append(file_path) return removed_files def remove_files_created_before(date, in_directory): """ Will throw if something horrible happens. Does not remove recursively (could be done with os.walk if needed). Does not remove directories either. :param date: datetime object :param in_directory: absolute path to directory :return: """ import os import time secs = time.mktime(date.timetuple()) if not os.path.isdir(in_directory): raise ValueError("No directory to clean at '%s'.") removed_files = [] for file_name in os.listdir(in_directory): file_path = os.path.join(in_directory, file_name) if os.path.isfile(file_path): t = os.stat(file_path) if t.st_ctime < secs: os.remove(file_path) removed_files.append(file_path) return removed_files def get_input_slug_from_query(inp=None): if inp is None: input_slug = request.args.get('input_slug', config['defaults']['input_slug']) else: input_slug = inp if input_slug not in [i['slug'] for i in config['inputs']]: input_slug = config['defaults']['input_slug'] # be tolerant instead of yelling loudly return input_slug def get_interval_from_query(): """ Get the interval from the query, or from defaults. Returns ISO date strings. """ before = relativedelta(months=1) after = relativedelta(months=1) today = datetime.datetime.now().replace(hour=0, minute=0, second=0) started_at = today - before stopped_at = today + after default_started_at = started_at.strftime(FILE_DATE_FMT) default_stopped_at = stopped_at.strftime(FILE_DATE_FMT) started_at = request.args.get('started_at', default_started_at) stopped_at = request.args.get('stopped_at', default_stopped_at) d_started_at = dateparser.isoparse(started_at) d_stopped_at = dateparser.isoparse(stopped_at) started_at = d_started_at.strftime(FILE_DATE_FMT) stopped_at = d_stopped_at.strftime(FILE_DATE_FMT) return started_at, stopped_at def get_catalog_layers(input_slug, target_slug, started_at, stopped_at): """ In the JSON file we have "columns" and "data". Of course, each JSON file has its own columns, with different conventions. :param input_slug: :param target_slug: :return: """ import json def _get_index_of_key(_data, _key): try: index = _data['columns'].index(_key) except ValueError: hp_logger.error("Key %s not found in columns of %s" % (_key, _data)) raise return index try: started_at = datetime.datetime.strptime(started_at, FILE_DATE_FMT) except: abort(400, "Invalid started_at parameter : '%s'." % started_at) try: stopped_at = datetime.datetime.strptime(stopped_at, FILE_DATE_FMT) except: abort(400, "Invalid stopped_at parameter : '%s'." % stopped_at) catalog_layers = {} for config_layer in config['layers']['catalogs']: if 'data' not in config_layer: continue catalog_layers[config_layer['slug']] = [] for cl_datum in config_layer['data']: if input_slug not in cl_datum: continue if target_slug not in cl_datum[input_slug]: continue if cl_datum[input_slug][target_slug] is None: # We used ~ in the config, there are no constraints constraints = [] else: constraints = cl_datum[input_slug][target_slug]['constraints'] with open(get_path("../data/catalog/%s" % cl_datum['file'])) as f: json_data = json.load(f) if 'start' not in cl_datum: hp_logger.error("Invalid configuration: 'start' is missing.") continue # skip this if 'format' not in cl_datum: cl_datum['format'] = CME_DATE_FMT # log.error("Invalid configuration: 'format' is missing.") # continue # skip this start_index = _get_index_of_key(json_data, cl_datum['start']) if 'stop' not in cl_datum: stop_index = start_index else: stop_index = _get_index_of_key(json_data, cl_datum['stop']) for json_datum in json_data['data']: validates_any_constraint = False if 0 == len(constraints): validates_any_constraint = True for constraint in constraints: validates_constraint = True for key, possible_values in iter(constraint.items()): actual_value = json_datum[_get_index_of_key( json_data, key )] if actual_value not in possible_values: validates_constraint = False break if validates_constraint: validates_any_constraint = True break if not validates_any_constraint: continue start_time = json_datum[start_index] stop_time = json_datum[stop_index] start_time = datetime.datetime.strptime( start_time, cl_datum['format'] ) stop_time = datetime.datetime.strptime( stop_time, cl_datum['format'] ) if start_time < started_at: continue catalog_layers[config_layer['slug']].append({ 'start': start_time.strftime(MOMENT_DATE_FMT), 'stop': stop_time.strftime(MOMENT_DATE_FMT), }) return catalog_layers def get_hit_counter(): hit_count_path = get_path("../VISITS") if isfile(hit_count_path): hit_count = int(open(hit_count_path).read()) else: hit_count = 1 return hit_count def increment_hit_counter(): hit_count_path = get_path("../VISITS") if isfile(hit_count_path): hit_count = int(open(hit_count_path).read()) hit_count += 1 else: hit_count = 1 hit_counter_file = open(hit_count_path, 'w') hit_counter_file.write(str(hit_count)) hit_counter_file.close() return hit_count def update_spacepy(): """ Importing pycdf will fail if the toolbox is not up to date. """ try: hp_logger.info("Updating spacepy's toolbox…") import spacepy.toolbox spacepy.toolbox.update() except Exception as e: hp_logger.error("Failed to update spacepy : %s." % e) tpl_global_vars['visits'] = get_hit_counter() # ROUTING ##################################################################### @app.route('/favicon.ico') def favicon(): # we want it served from the root, not from static/ return send_from_directory( join(app.root_path, 'static', 'img'), 'favicon.ico', mimetype='image/vnd.microsoft.icon' ) @app.route("/") @app.route("/home.html") @app.route("/index.html") def home(): increment_hit_counter() parameters = PARAMETERS.values() parameters = sorted(parameters, key=lambda x: x['position']) input_slug = get_input_slug_from_query() targets = [t for t in config['targets'] if not t['locked']] started_at, stopped_at = get_interval_from_query() for i, target in enumerate(targets): targets[i]['catalog_layers'] = get_catalog_layers( input_slug, target['slug'], started_at, stopped_at ) return render_view('home.html.jinja2', { # 'targets': config['targets'], 'targets': targets, 'parameters': parameters, 'input_slug': input_slug, 'started_at': started_at, 'stopped_at': stopped_at, 'planets': [s for s in config['targets'] if s['type'] == 'planet'], 'probes': [s for s in config['targets'] if s['type'] == 'probe'], 'comets': [s for s in config['targets'] if s['type'] == 'comet'], 'visits': get_hit_counter(), }) @app.route("/about.html") def about(): import uuid increment_hit_counter() return render_view('about.html.jinja2', { 'authors_emails': [a['mail'] for a in config['authors']], 'uuid4': str(uuid.uuid4())[0:3], 'visits': get_hit_counter(), }) @app.route("/help.html") def help(): return render_view('help.html.jinja2', { 'visits': get_hit_counter(), }) @app.route("/" + contents + "" @app.route("/log/clear") def log_clear(): with open(LOG_FILE, 'w') as f: f.truncate() return "Log cleared successfully." # DEV TOOLS ################################################################### # @app.route("/inspect") # def analyze_cdf(): # """ # For debug purposes. # """ # cdf_to_inspect = get_path("../res/dummy.nc") # cdf_to_inspect = get_path("../res/dummy_jupiter_coordinates.nc") # # si = StringIO() # cw = csv.DictWriter(si, fieldnames=['Name', 'Shape', 'Length']) # cw.writeheader() # # # Time, StartTime, StopTime, V, B, N, T, Delta_angle, P_dyn, QualityFlag # cdf_handle = Dataset(cdf_to_inspect, "r", format="NETCDF4") # for variable in cdf_handle.variables: # v = cdf_handle.variables[variable] # cw.writerow({ # 'Name': variable, # 'Shape': v.shape, # 'Length': v.size, # }) # cdf_handle.close() # # return si.getvalue() # MAIN ######################################################################## if __name__ == "__main__": # Debug mode is on, as the production server does not use this but run.wsgi extra_files = [get_path('../config.yml')] app.run(debug=True, extra_files=extra_files)