# coding=utf-8 import StringIO import datetime import gzip import json import logging import random import tarfile import time import urllib import requests import re # regex from csv import writer as csv_writer, DictWriter as csv_dict_writer from math import sqrt, isnan from os import environ, remove as removefile from os.path import isfile, join, abspath, dirname # (i) Just ignore the requirements warning, we already require `dateutils`. from dateutil.relativedelta import relativedelta from dateutil import parser as dateparser from flask import Flask from flask import request from flask import url_for, send_from_directory, abort as abort_flask from jinja2 import Environment, FileSystemLoader, Markup from yaml import load as yaml_load from netCDF4 import Dataset, date2num # PATH RELATIVITY ############################################################# THIS_DIRECTORY = dirname(abspath(__file__)) def get_path(relative_path): """Get an absolute path from the relative path to this script directory.""" return abspath(join(THIS_DIRECTORY, relative_path)) # COLLECT GLOBAL INFORMATION FROM SOURCES ##################################### # VERSION with open(get_path('../VERSION'), 'r') as version_file: version = version_file.read().strip() # CONFIG with open(get_path('../config.yml'), 'r') as config_file: config = yaml_load(config_file.read()) FILE_DATE_FMT = "%Y-%m-%dT%H:%M:%S" MOMENT_DATE_FMT = "%Y-%m-%dT%H:%M:%SZ" CME_DATE_FMT = "%Y-%m-%dT%H:%MZ" # Are we on the SSA instance for ESA? SSA = environ.get('SSA') == 'true' DEBUG = environ.get('DEBUG') == 'true' # SSA = True # LOGGING ##################################################################### LOG_FILE = get_path('run.log') log = logging.getLogger("HelioPropa") if DEBUG: log.setLevel(logging.DEBUG) else: log.setLevel(logging.ERROR) logHandler = logging.FileHandler(LOG_FILE) logHandler.setFormatter(logging.Formatter( "%(asctime)s - %(levelname)s - %(message)s" )) log.addHandler(logHandler) # HARDCODED CONFIGURATION ##################################################### ASTRONOMICAL_UNIT_IN_KM = 1.496e8 # Absolute path to the installed CDF library from https://cdf.gsfc.nasa.gov/ CDF_LIB = '/usr/local/lib/libcdf' # Absolute path to the data cache directory CACHE_DIR = get_path('../cache') # These two configs are not in the YAML config because adding a new parameter # will not work as-is, you'd have to edit some netcdf-related code. # The slugs of the available parameters in the generated CSV files. # The order matters. If you change this you also need to change the # innermost loop of `get_data_for_target`. # The javascript knows the targets' properties under these names. PROPERTIES = ('time', 'vrad', 'vtan', 'vtot', 'btan', 'temp', 'pdyn', 'dens', 'atse', 'xhee', 'yhee') # The parameters that the users can handle. # The slug MUST be one of the properties above. PARAMETERS = { 'pdyn': { 'slug': 'pdyn', 'name': 'Dyn. Pressure', 'title': 'The dynamic pressure.', 'units': 'nPa', 'active': True, 'position': 10, }, 'vtot': { 'slug': 'vtot', 'name': 'Velocity', 'title': 'The velocity of the particles.', 'units': 'km/s', 'active': False, 'position': 20, }, 'btan': { 'slug': 'btan', 'name': 'B Tangential', 'title': 'B Tangential.', 'units': 'nT', 'active': False, 'position': 30, }, 'temp': { 'slug': 'temp', 'name': 'Temperature', 'title': 'The temperature.', 'units': 'eV', 'active': False, 'position': 40, }, 'dens': { 'slug': 'dens', 'name': 'Density', 'title': 'The density N.', 'units': 'cm^-3', 'active': False, 'position': 50, }, 'atse': { 'slug': 'atse', 'name': 'Angle T-S-E', 'title': 'Angle Target-Sun-Earth.', 'units': 'deg', 'active': False, 'position': 60, }, } # SETUP ENVIRONMENT ########################################################### environ['SPACEPY'] = CACHE_DIR environ['CDF_LIB'] = CDF_LIB # SETUP FLASK ENGINE ########################################################## app = Flask(__name__, root_path=THIS_DIRECTORY) app.debug = DEBUG if app.debug: log.info("Starting Flask app IN DEBUG MODE...") else: log.info("Starting Flask app...") def handle_error(e): log.error(e) return str(e) # wish we could use the default error renderer here app.register_error_handler(Exception, handle_error) # SETUP JINJA2 TEMPLATE ENGINE ################################################ def static_global(filename): return url_for('static', filename=filename) def shuffle_filter(seq): """ This shuffles the sequence it is applied to. Jinja2 _should_ provide this. """ try: result = list(seq) random.shuffle(result) return result except: return seq def markdown_filter(value, nl2br=False, p=True): """ Converts markdown into html. nl2br: set to True to replace line breaks with <br> tags p: set to False to remove the enclosing <p></p> tags """ from markdown import markdown from markdown.extensions.nl2br import Nl2BrExtension from markdown.extensions.abbr import AbbrExtension extensions = [AbbrExtension()] if nl2br is True: extensions.append(Nl2BrExtension()) markdowned = markdown(value, output_format='html5', extensions=extensions) if p is False: markdowned = markdowned.replace(r"<p>", "").replace(r"</p>", "") return markdowned _js_escapes = { '\\': '\\u005C', '\'': '\\u0027', '"': '\\u0022', '>': '\\u003E', '<': '\\u003C', '&': '\\u0026', '=': '\\u003D', '-': '\\u002D', ';': '\\u003B', u'\u2028': '\\u2028', u'\u2029': '\\u2029' } # Escape every ASCII character with a value less than 32. _js_escapes.update(('%c' % z, '\\u%04X' % z) for z in xrange(32)) def escapejs_filter(value): escaped = [] for letter in value: if letter in _js_escapes: escaped.append(_js_escapes[letter]) else: escaped.append(letter) return Markup("".join(escaped)) tpl_engine = Environment(loader=FileSystemLoader([get_path('view')]), trim_blocks=True, lstrip_blocks=True) tpl_engine.globals.update( url_for=url_for, static=static_global, ) tpl_engine.filters['markdown'] = markdown_filter tpl_engine.filters['md'] = markdown_filter tpl_engine.filters['shuffle'] = shuffle_filter tpl_engine.filters['escapejs'] = escapejs_filter tpl_global_vars = { 'request': request, 'version': version, 'config': config, 'now': datetime.datetime.now(), 'is_esa': SSA, } # HELPERS ##################################################################### def abort(code, message): log.error("Abort: " + message) abort_flask(code, message) def render_view(view, context=None): """ A simple helper to render [view] template with [context] vars. It automatically adds the global template vars defined above, too. It returns a string, usually the HTML contents to display. """ context = {} if context is None else context return tpl_engine.get_template(view).render( dict(tpl_global_vars.items() + context.items()) ) # def render_page(page, title="My Page", context=None): # """ # A simple helper to render the md_page.html template with [context] vars & # the additional contents of `page/[page].md` in the `md_page` variable. # It automagically adds the global template vars defined above, too. # It returns a string, usually the HTML contents to display. # """ # if context is None: # context = {} # context['title'] = title # context['md_page'] = '' # with file(get_path('page/%s.md' % page)) as f: # context['md_page'] = f.read() # return tpl_engine.get_template('md_page.html').render( # dict(tpl_global_vars.items() + context.items()) # ) def is_list_in_list(needle, haystack): for n in needle: if n not in haystack: return False return True def round_time(dt=None, round_to=60): """ Round a datetime object to any time laps in seconds dt : datetime.datetime object, default now. roundTo : Closest number of seconds to round to, default 1 minute. """ if dt is None: dt = datetime.datetime.now() seconds = (dt.replace(tzinfo=None) - dt.min).seconds rounding = (seconds + round_to / 2) // round_to * round_to return dt + datetime.timedelta(0, rounding-seconds, -dt.microsecond) def datetime_from_list(time_list): """ Datetimes in retrieved CDFs are stored as lists of numbers, with DayOfYear starting at 0. We want it starting at 1 because it's what vendor parsers use, both in python and javascript. """ # Day Of Year starts at 0, but for our datetime parser it starts at 1 doy = '{:03d}'.format(int(''.join(time_list[4:7])) + 1) return datetime.datetime.strptime( "%s%s%s" % (''.join(time_list[0:4]), doy, ''.join(time_list[7:-1])), "%Y%j%H%M%S%f" ) def get_local_filename(url): """ Build the local cache filename for the distant file :param url: string :return: string """ from slugify import slugify n = len('http://') if url.startswith('https'): n += 1 s = url[n:] return slugify(s) def get_target_config(slug): for s in config['targets']: # dumb if s['slug'] == slug: return s raise Exception("No target found in configuration for '%s'." % slug) def check_target_config(slug): get_target_config(slug) def get_active_targets(): all_targets = config['targets'] return [t for t in all_targets if not ('locked' in t and t['locked'])] def validate_tap_target_config(target): tc = get_target_config(target) if 'tap' not in tc: raise Exception("No `tap` configuration for target `%s`." % target) if 'target_name' not in tc['tap']: raise Exception("No `target_name` in the `tap` configuration for target `%s`." % target) return tc # Using pyvo would be best. # def retrieve_auroral_emissions_vopy(target_name): # api_url = "http://voparis-tap.obspm.fr/__system__/tap/run/tap/sync" # import pyvo as vo # service = vo.dal.TAPService(api_url) # # … can't figure out how to install pyvo and spacepy alongside (forking?) def retrieve_auroral_emissions(target_name, d_started_at=None, d_stopped_at=None): """ Work In Progress. :param target_name: You should probably not let users define this value, as our sanitizing for ADQL may not be 100% safe. Use values from YAML configuration, instead. Below is a list of the ids we found to be existing. > SELECT DISTINCT target_name FROM apis.epn_core - Mars - MERCURY - Jupiter - Titan - Io - VENUS - Ganymede - Uranus - Callisto - Europa - Saturn :return: """ # Try out the form # http://voparis-tap-planeto.obspm.fr/__system__/adql/query/form api_url = "http://voparis-tap.obspm.fr/__system__/tap/run/tap/sync" if d_started_at is None: d_started_at = datetime.datetime.now() t_started_at = time.mktime(d_started_at.timetuple()) - 3600 * 24 * 365 * 2 # t_started_at = 1 else: t_started_at = time.mktime(d_started_at.timetuple()) if d_stopped_at is None: d_stopped_at = datetime.datetime.now() t_stopped_at = time.mktime(d_stopped_at.timetuple()) def timestamp_to_jday(timestamp): return timestamp / 86400.0 + 2440587.5 def jday_to_timestamp(jday): return (jday - 2440587.5) * 86400.0 def jday_to_datetime(jday): return datetime.datetime.utcfromtimestamp(jday_to_timestamp(jday)) # SELECT DISTINCT dataproduct_type FROM apis.epn_core # > im sp sc query = """ SELECT time_min, time_max, thumbnail_url, external_link FROM apis.epn_core WHERE target_name='{target_name}' AND dataproduct_type='im' AND time_min >= {jday_start} AND time_min <= {jday_stop} ORDER BY time_min, granule_gid """.format( target_name=target_name.replace("'", "\\'"), jday_start=timestamp_to_jday(t_started_at), jday_stop=timestamp_to_jday(t_stopped_at) ) # query = """ # SELECT DISTINCT target_name FROM apis.epn_core # """ try: response = requests.post(api_url, { 'REQUEST': 'doQuery', 'LANG': 'ADQL', 'QUERY': query, 'TIMEOUT': '30', 'FORMAT': 'VOTable/td' }) response_xml = response.text import xml.etree.ElementTree as ET root = ET.fromstring(response_xml) namespaces = {'vo': 'http://www.ivoa.net/xml/VOTable/v1.3'} rows_xpath = "./vo:RESOURCE/vo:TABLE/vo:DATA/vo:TABLEDATA/vo:TR" rows = [] for row in root.findall(rows_xpath, namespaces): rows.append({ 'time_min': jday_to_datetime(float(row[0].text)), 'time_max': jday_to_datetime(float(row[1].text)), 'thumbnail_url': row[2].text, 'external_link': row[3].text, }) # print("Auroral emission query") # print(query) # print("Auroral emission rows") # print(rows) return rows except Exception as e: print("Failed to retrieve auroral emissions :") print(e) # print(query) def retrieve_amda_netcdf(orbiter, what, started_at, stopped_at): """ Handles remote querying AMDA's API for URLs, and then downloading, extracting and caching the netCDF files. :param orbiter: key of the source in the YAML config :param what: either 'model' or 'orbit', a key in the config of the source :param started_at: :param stopped_at: :return: a list of local file paths to netCDF (.nc) files """ url = config['amda'].format( dataSet=what, startTime=started_at.isoformat(), stopTime=stopped_at.isoformat() ) log.info("Fetching remote gzip files list at '%s'." % url) retries = 0 success = False errors = [] remote_gzip_files = [] while not success and retries < 3: try: response = urllib.urlopen(url) remote_gzip_files = json.loads(response.read()) if not remote_gzip_files: raise Exception("Failed to fetch data at '%s'." % url) if remote_gzip_files == 'NODATASET': raise Exception("API says there's no dataset at '%s'." % url) if remote_gzip_files == 'ERROR': raise Exception("API returned an error at '%s'." % url) if remote_gzip_files == ['OUTOFTIME']: # it happens return [] # raise Exception("API says it's out of time at '%s'." % url) success = True except Exception as e: log.warn("Failed (%d/3) '%s' : %s" % (retries+1, url, e.message)) remote_gzip_files = [] errors.append(e) finally: retries += 1 if not remote_gzip_files: log.error("Failed to retrieve data from AMDA.") log.error("Failed to fetch gzip files list for %s at '%s' : %s" % (orbiter, url, errors)) abort(400, "Failed to fetch gzip files list for %s at '%s' : %s" % (orbiter, url, errors)) else: remote_gzip_files = list(set(remote_gzip_files)) log.debug("Fetched remote gzip files list : %s." % str(remote_gzip_files)) local_gzip_files = [] for remote_gzip_file in remote_gzip_files: # hotfixes to remove when fixed upstream @Myriam if remote_gzip_file in ['OUTOFTIME', 'ERROR']: continue # sometimes half the response is okay, the other not if remote_gzip_file.endswith('/.gz'): continue # this is just a plain bug remote_gzip_file = remote_gzip_file.replace('cdpp1', 'cdpp', 1) ################################################ local_gzip_file = join(CACHE_DIR, get_local_filename(remote_gzip_file)) local_gzip_files.append(local_gzip_file) if not isfile(local_gzip_file): log.debug("Retrieving '%s'..." % local_gzip_file) urllib.urlretrieve(remote_gzip_file, local_gzip_file) log.debug("Retrieved '%s'." % local_gzip_file) else: log.debug("Found '%s' in the cache." % local_gzip_file) local_netc_files = [] for local_gzip_file in local_gzip_files: local_netc_file = local_gzip_file[0:-3] log.debug("Unzipping '%s'..." % local_gzip_file) success = True try: with gzip.open(local_gzip_file) as f: file_content = f.read() with open(local_netc_file, 'w+b') as g: g.write(file_content) except Exception as e: success = False log.error("Cannot process gz file '%s' from '%s' : %s" % (local_gzip_file, url, e)) # Sometimes, the downloaded gz is corrupted, and CRC checks fail. # We want to delete the local gz file and try again next time. removefile(local_gzip_file) if success: local_netc_files.append(local_netc_file) log.debug("Unzipped '%s'." % local_gzip_file) return list(set(local_netc_files)) # remove possible dupes # class DataParser: # """ # Default data parser # A wip to try to handle code exeptions sanely. # """ # # # Override these using the model configuration # default_nc_keys = { # 'hee': 'HEE', # 'vtot': 'V', # 'magn': 'B', # 'temp': 'T', # 'dens': 'N', # 'pdyn': 'P_dyn', # 'atse': 'Delta_angle', # } # # def __init__(self, target, model): # self.target = target # self.model = model # pass # # def _read_var(self, nc, _keys, _key, mandatory=False): # try: # return nc.variables[_keys[_key]] # except KeyError: # pass # if mandatory: # raise Exception("No variable '%s' found in NetCDF." % _keys[_key]) # return [None] * len(nc.variables['Time']) # slow -- use numpy? # # def parse(self, cdf_handle): # nc_keys = self.default_nc_keys.copy() # # times = cdf_handle.variables['Time'] # YYYY DOY HH MM SS .ms # data_v = self._read_var(cdf_handle, nc_keys, 'vtot') # data_b = self._read_var(cdf_handle, nc_keys, 'magn') # data_t = self._read_var(cdf_handle, nc_keys, 'temp') # data_n = self._read_var(cdf_handle, nc_keys, 'dens') # data_p = self._read_var(cdf_handle, nc_keys, 'pdyn') # data_a = self._read_var(cdf_handle, nc_keys, 'atse') # # return zip() def get_data_for_target(target_config, input_slug, started_at, stopped_at): """ :return: dict whose keys are datetime as str, values tuples of data """ log.debug("Grabbing data for '%s'..." % target_config['slug']) try: models = target_config['models'][input_slug] except Exception as e: abort(500, "Invalid model configuration for '%s' : %s" % (target_config['slug'], str(e))) try: orbits = target_config['orbit']['models'] except KeyError as e: orbits = [] # abort(500, "Invalid orbit configuration for '%s' : %s" # % (target_config['slug'], str(e))) def _sta_sto(_cnf, _sta, _sto): if 'started_at' in _cnf: _s0 = datetime.datetime.strptime(_cnf['started_at'], FILE_DATE_FMT) _s0 = max(_s0, _sta) else: _s0 = _sta if 'stopped_at' in _cnf: _s1 = datetime.datetime.strptime(_cnf['stopped_at'], FILE_DATE_FMT) _s1 = min(_s1, _sto) else: _s1 = _sto return _s0, _s1 def _read_var(_nc, _keys, _key, mandatory=False): try: return _nc.variables[_keys[_key]] except KeyError: pass if mandatory: raise Exception("No variable '%s' found in NetCDF." % _keys[_key]) return [None] * len(_nc.variables['Time']) # slow -- use numpy! # Override these using the model configuration in config.yml default_nc_keys = { 'hee': 'HEE', 'vtot': 'V', 'magn': 'B', 'temp': 'T', 'dens': 'N', 'pdyn': 'P_dyn', 'atse': 'Delta_angle', } precision = "%Y-%m-%dT%H" # model and orbits times are only equal-ish orbit_data = {} # keys are datetime as str, values arrays of XY for orbit in orbits: s0, s1 = _sta_sto(orbit, started_at, stopped_at) nc_keys = default_nc_keys.copy() if 'parameters' in orbit: nc_keys.update(orbit['parameters']) orbit_files = retrieve_amda_netcdf( target_config['slug'], orbit['slug'], s0, s1 ) for orbit_file in orbit_files: log.debug("%s: opening orbit NETCDF4 '%s'..." % (target_config['name'], orbit_file)) cdf_handle = Dataset(orbit_file, "r", format="NETCDF4") times = cdf_handle.variables['Time'] # YYYY DOY HH MM SS .ms data_hee = _read_var(cdf_handle, nc_keys, 'hee', mandatory=True) log.debug("%s: aggregating data from '%s'..." % (target_config['name'], orbit_file)) for ltime, datum_hee in zip(times, data_hee): try: dtime = datetime_from_list(ltime) except Exception: log.error("Failed to parse time from %s." % ltime) raise # Keep only what's in the interval if s0 <= dtime <= s1: dkey = round_time(dtime, 60*60).strftime(precision) orbit_data[dkey] = datum_hee cdf_handle.close() all_data = {} # keys are datetime as str, values tuples of data for model in models: s0, s1 = _sta_sto(model, started_at, stopped_at) model_files = retrieve_amda_netcdf( target_config['slug'], model['slug'], s0, s1 ) nc_keys = default_nc_keys.copy() if 'parameters' in model: nc_keys.update(model['parameters']) if len(model_files) == 0: log.warn("No model data for '%s' '%s'." % (target_config['slug'], model['slug'])) for model_file in model_files: log.debug("%s: opening model NETCDF4 '%s'..." % (target_config['name'], model_file)) cdf_handle = Dataset(model_file, "r", format="NETCDF4") # log.debug(cdf_handle.variables.keys()) times = cdf_handle.variables['Time'] # YYYY DOY HH MM SS .ms data_v = _read_var(cdf_handle, nc_keys, 'vtot') data_b = _read_var(cdf_handle, nc_keys, 'magn') data_t = _read_var(cdf_handle, nc_keys, 'temp') data_n = _read_var(cdf_handle, nc_keys, 'dens') data_p = _read_var(cdf_handle, nc_keys, 'pdyn') data_a = _read_var(cdf_handle, nc_keys, 'atse') # Usually: # Time, StartTime, StopTime, V, B, N, T, Delta_angle, P_dyn # Earth: # Time, BartelsNumber, ImfID, SwID, ImfPoints, # SwPoints, B_M_av, B_Vec_av, B_Theta_av, # B_Phi_av, B, T, N, V, Vlat, Vlon, # Alpha, RamP, E, Beta, Ma, Kp, R, DST, # AE, Flux, Flag, F10_Index, StartTime, StopTime # Don't ignore, but instead, compute a mean ? # Look for discretisation ? temporal drizzling ? 1D drizzling ? # The easy linear mean feels a bit rough too. Running mean too. # FIXME ignored_count = 0 log.debug("%s: aggregating data from '%s'..." % (target_config['name'], model_file)) for ltime, datum_v, datum_b, datum_t, datum_n, datum_p, datum_a \ in zip(times, data_v, data_b, data_t, data_n, data_p, data_a): try: dtime = datetime_from_list(ltime) except Exception: log.error("Failed to parse time from %s." % ltime) raise if not (s0 <= dtime <= s1): continue # Cull what's out of the interval droundtime = round_time(dtime, 60*60) dkey = droundtime.strftime(precision) x_hee = None y_hee = None if dkey in orbit_data: x_hee = orbit_data[dkey][0] / ASTRONOMICAL_UNIT_IN_KM y_hee = orbit_data[dkey][1] / ASTRONOMICAL_UNIT_IN_KM # First exception: V may be a vector instead of a scalar if hasattr(datum_v, '__len__'): vrad = datum_v[0] vtan = datum_v[1] vtot = sqrt(vrad * vrad + vtan * vtan) else: # eg: Earth vrad = None vtan = None vtot = datum_v # Second exception: Earth is always at (1, 0) if target_config['slug'] == 'earth': x_hee = 1 y_hee = 0 # Third exception: B is a Vector3 or a Vector2 for Earth if target_config['slug'] == 'earth': if model['slug'] == 'omni_hour_all': # Vector3 datum_b = datum_b[0] # if model['slug'] == 'ace_swepam_real': # Vector2 # datum_b = datum_b[0] if model['slug'] == 'omni_hour_all': datum_p = datum_n * vtot * vtot * 1.6726e-6 # ? if model['slug'] == 'ace_swepam_real': datum_p = datum_n * vtot * vtot * 1.6726e-6 # ? if vtot is None or isnan(vtot): continue # Keep adding exceptions here until you can't or become mad # Crude/Bad drizzling condition : first found datum in the hour # gets to set the data. Improve this. if dkey not in all_data: # /!. The Set value MUST be in the same order as PROPERTIES all_data[dkey] = ( dtime.strftime("%Y-%m-%dT%H:%M:%S+00:00"), vrad, vtan, vtot, datum_b, datum_t, datum_p, datum_n, datum_a, x_hee, y_hee ) else: ignored_count += 1 # Improve this loop so as to remove this stinky debug log if ignored_count > 0: log.debug(" Ignored %d datum(s) during ~\"drizzling\"." % ignored_count) cdf_handle.close() return all_data def generate_csv_contents(target_slug, input_slug, started_at, stopped_at): target_config = get_target_config(target_slug) log.debug("Crunching CSV contents for '%s'..." % target_config['name']) si = StringIO.StringIO() cw = csv_writer(si) cw.writerow(PROPERTIES) all_data = get_data_for_target( target_config=target_config, input_slug=input_slug, started_at=started_at, stopped_at=stopped_at ) log.debug("Writing and sorting CSV for '%s'..." % target_config['slug']) for dkey in sorted(all_data): cw.writerow(all_data[dkey]) log.info("Generated CSV contents for '%s'." % target_config['slug']) return si.getvalue() def generate_csv_file_if_needed(target_slug, input_slug, started_at, stopped_at): filename = "%s_%s_%s_%s.csv" % (target_slug, input_slug, started_at.strftime(FILE_DATE_FMT), stopped_at.strftime(FILE_DATE_FMT)) local_csv_file = join(CACHE_DIR, filename) generate = True if isfile(local_csv_file): # It needs to have more than one line to not be empty (headers) with open(local_csv_file) as f: cnt = 0 for _ in f: cnt += 1 if cnt > 1: generate = False break if generate: log.info("Generating CSV '%s'..." % local_csv_file) try: with open(local_csv_file, mode="w+") as f: f.write(generate_csv_contents( target_slug=target_slug, input_slug=input_slug, started_at=started_at, stopped_at=stopped_at )) log.info("Generation of '%s' done." % filename) except Exception as e: from sys import exc_info from traceback import extract_tb exc_type, exc_value, exc_traceback = exc_info() log.error(e) for trace in extract_tb(exc_traceback): log.error(trace) if isfile(local_csv_file): log.warn("Removing failed CSV '%s'..." % local_csv_file) removefile(local_csv_file) abort(500, "Failed creating CSV '%s' : %s" % (filename, e)) def remove_all_files(in_directory): """ Will throw if something horrible happens. Does not remove recursively (could be done with os.walk if needed). Does not remove directories either. :param in_directory: absolute path to directory :return: """ import os if not os.path.isdir(in_directory): raise ValueError("No directory to clean at '%s'.") removed_files = [] for file_name in os.listdir(in_directory): file_path = os.path.join(in_directory, file_name) if os.path.isfile(file_path): os.remove(file_path) removed_files.append(file_path) return removed_files def remove_files_created_before(date, in_directory): """ Will throw if something horrible happens. Does not remove recursively (could be done with os.walk if needed). Does not remove directories either. :param date: datetime object :param in_directory: absolute path to directory :return: """ import os import time secs = time.mktime(date.timetuple()) if not os.path.isdir(in_directory): raise ValueError("No directory to clean at '%s'.") removed_files = [] for file_name in os.listdir(in_directory): file_path = os.path.join(in_directory, file_name) if os.path.isfile(file_path): t = os.stat(file_path) if t.st_ctime < secs: os.remove(file_path) removed_files.append(file_path) return removed_files def get_input_slug_from_query(inp=None): if inp is None: input_slug = request.args.get('input_slug', 'l1') else: input_slug = inp if input_slug not in [i['slug'] for i in config['inputs']]: input_slug = 'l1' # be tolerant instead of yelling loudly return input_slug def get_interval_from_query(): """ Get the interval from the query, or from defaults. Returns ISO date strings. """ before = relativedelta(months=2) after = relativedelta(months=1) today = datetime.datetime.now().replace(hour=0, minute=0, second=0) started_at = today - before stopped_at = today + after default_started_at = started_at.strftime(FILE_DATE_FMT) default_stopped_at = stopped_at.strftime(FILE_DATE_FMT) started_at = request.args.get('started_at', default_started_at) stopped_at = request.args.get('stopped_at', default_stopped_at) d_started_at = dateparser.isoparse(started_at) d_stopped_at = dateparser.isoparse(stopped_at) started_at = d_started_at.strftime(FILE_DATE_FMT) stopped_at = d_stopped_at.strftime(FILE_DATE_FMT) return started_at, stopped_at def get_catalog_layers(input_slug, target_slug, started_at, stopped_at): """ In the JSON file we have "columns" and "data". Of course, each JSON file has its own columns, with different conventions. :param input_slug: :param target_slug: :return: """ import json def _get_index_of_key(_data, _key): try: index = _data['columns'].index(_key) except ValueError: log.error("Key %s not found in columns of %s" % (_key, _data)) raise return index try: started_at = datetime.datetime.strptime(started_at, FILE_DATE_FMT) except: abort(400, "Invalid started_at parameter : '%s'." % started_at) try: stopped_at = datetime.datetime.strptime(stopped_at, FILE_DATE_FMT) except: abort(400, "Invalid stopped_at parameter : '%s'." % stopped_at) catalog_layers = {} for config_layer in config['layers']['catalogs']: if 'data' not in config_layer: continue catalog_layers[config_layer['slug']] = [] for cl_datum in config_layer['data']: if input_slug not in cl_datum: continue if target_slug not in cl_datum[input_slug]: continue if cl_datum[input_slug][target_slug] is None: # We used ~ in the config, there are no constraints constraints = [] else: constraints = cl_datum[input_slug][target_slug]['constraints'] with open(get_path("../data/catalog/%s" % cl_datum['file'])) as f: json_data = json.load(f) if 'start' not in cl_datum: log.error("Invalid configuration: 'start' is missing.") continue # skip this if 'format' not in cl_datum: cl_datum['format'] = CME_DATE_FMT # log.error("Invalid configuration: 'format' is missing.") # continue # skip this start_index = _get_index_of_key(json_data, cl_datum['start']) if 'stop' not in cl_datum: stop_index = start_index else: stop_index = _get_index_of_key(json_data, cl_datum['stop']) for json_datum in json_data['data']: validates_any_constraint = False if 0 == len(constraints): validates_any_constraint = True for constraint in constraints: validates_constraint = True for key, possible_values in constraint.iteritems(): actual_value = json_datum[_get_index_of_key( json_data, key )] if actual_value not in possible_values: validates_constraint = False break if validates_constraint: validates_any_constraint = True break if not validates_any_constraint: continue start_time = json_datum[start_index] stop_time = json_datum[stop_index] start_time = datetime.datetime.strptime( start_time, cl_datum['format'] ) stop_time = datetime.datetime.strptime( stop_time, cl_datum['format'] ) if start_time < started_at: continue catalog_layers[config_layer['slug']].append({ 'start': start_time.strftime(MOMENT_DATE_FMT), 'stop': stop_time.strftime(MOMENT_DATE_FMT), }) return catalog_layers def get_hit_counter(): hit_count_path = get_path("../VISITS") if isfile(hit_count_path): hit_count = int(open(hit_count_path).read()) else: hit_count = 1 return hit_count def increment_hit_counter(): hit_count_path = get_path("../VISITS") if isfile(hit_count_path): hit_count = int(open(hit_count_path).read()) hit_count += 1 else: hit_count = 1 hit_counter_file = open(hit_count_path, 'w') hit_counter_file.write(str(hit_count)) hit_counter_file.close() return hit_count def update_spacepy(): """ Importing pycdf will fail if the toolbox is not up to date. """ try: log.info("Updating spacepy's toolbox…") import spacepy.toolbox spacepy.toolbox.update() except Exception as e: log.error("Failed to update spacepy : %s." % e) tpl_global_vars['visits'] = get_hit_counter() # ROUTING ##################################################################### @app.route('/favicon.ico') def favicon(): # we want it served from the root, not from static/ return send_from_directory( join(app.root_path, 'static', 'img'), 'favicon.ico', mimetype='image/vnd.microsoft.icon' ) @app.route("/") @app.route("/home.html") @app.route("/index.html") def home(): increment_hit_counter() parameters = PARAMETERS.values() parameters.sort(key=lambda x: x['position']) input_slug = get_input_slug_from_query() targets = [t for t in config['targets'] if not t['locked']] started_at, stopped_at = get_interval_from_query() for i, target in enumerate(targets): targets[i]['catalog_layers'] = get_catalog_layers( input_slug, target['slug'], started_at, stopped_at ) return render_view('home.html.jinja2', { # 'targets': config['targets'], 'targets': targets, 'parameters': parameters, 'input_slug': input_slug, 'started_at': started_at, 'stopped_at': stopped_at, 'planets': [s for s in config['targets'] if s['type'] == 'planet'], 'probes': [s for s in config['targets'] if s['type'] == 'probe'], 'comets': [s for s in config['targets'] if s['type'] == 'comet'], 'visits': get_hit_counter(), }) @app.route("/about.html") def about(): import uuid increment_hit_counter() return render_view('about.html.jinja2', { 'authors_emails': [a['mail'] for a in config['authors']], 'uuid4': str(uuid.uuid4())[0:3], 'visits': get_hit_counter(), }) @app.route("/help.html") def help(): return render_view('help.html.jinja2', { 'visits': get_hit_counter(), }) @app.route("/<target>_<inp>_<started_at>_<stopped_at>.csv") def download_target_csv(target, inp, started_at, stopped_at): """ Grab data and orbit data for the specified `target`, rearrange it and return it as a CSV file. `started_at` and `stopped_at` should be UTC. `inp` is the input slug, l1 or sa or sb. """ check_target_config(target) try: started_at = datetime.datetime.strptime(started_at, FILE_DATE_FMT) except: abort(400, "Invalid started_at parameter : '%s'." % started_at) try: stopped_at = datetime.datetime.strptime(stopped_at, FILE_DATE_FMT) except: abort(400, "Invalid stopped_at parameter : '%s'." % stopped_at) input_slug = get_input_slug_from_query(inp=inp) filename = "%s_%s_%s_%s.csv" % (target, input_slug, started_at.strftime(FILE_DATE_FMT), stopped_at.strftime(FILE_DATE_FMT)) local_csv_file = join(CACHE_DIR, filename) generate_csv_file_if_needed( target_slug=target, input_slug=input_slug, started_at=started_at, stopped_at=stopped_at ) if not isfile(local_csv_file): abort(500, "Could not cache CSV file at '%s'." % local_csv_file) return send_from_directory(CACHE_DIR, filename) @app.route("/<targets>_<inp>_<started_at>_<stopped_at>.tar.gz") def download_targets_tarball(targets, inp, started_at, stopped_at): """ Grab data and orbit data for each of the specified `targets`, in their own CSV file, and make a tarball of them. `started_at` and `stopped_at` should be UTC strings. Note: we do not use this route anymore, but let's keep it shelved for now. targets: string list of targets' slugs, separated by `-`. """ separator = '-' targets = targets.split(separator) targets.sort() targets_configs = [] for target in targets: if not target: abort(400, "Invalid targets format : `%s`." % targets) targets_configs.append(get_target_config(target)) if 0 == len(targets_configs): abort(400, "No valid targets specified. What are you doing?") try: started_at = datetime.datetime.strptime(started_at, FILE_DATE_FMT) except: abort(400, "Invalid started_at parameter : '%s'." % started_at) try: stopped_at = datetime.datetime.strptime(stopped_at, FILE_DATE_FMT) except: abort(400, "Invalid stopped_at parameter : '%s'." % stopped_at) sta = started_at.strftime(FILE_DATE_FMT) sto = stopped_at.strftime(FILE_DATE_FMT) input_slug = get_input_slug_from_query(inp=inp) gzip_filename = "%s_%s_%s_%s.tar.gz" % ( separator.join(targets), input_slug, sta, sto ) local_gzip_file = join(CACHE_DIR, gzip_filename) if not isfile(local_gzip_file): log.debug("Creating the CSV files for the tarball...") for target_config in targets_configs: filename = "%s_%s_%s_%s.csv" % ( target_config['slug'], input_slug, sta, sto ) local_csv_file = join(CACHE_DIR, filename) if not isfile(local_csv_file): with open(local_csv_file, mode="w+") as f: f.write(generate_csv_contents( target_slug=target_config['slug'], started_at=started_at, stopped_at=stopped_at, input_slug=input_slug )) log.debug("Creating the tarball '%s'..." % local_gzip_file) with tarfile.open(local_gzip_file, "w:gz") as tar: for target_config in targets_configs: filename = "%s_%s_%s_%s.csv" % ( target_config['slug'], input_slug, sta, sto ) local_csv_file = join(CACHE_DIR, filename) tar.add(local_csv_file, arcname=filename) if not isfile(local_gzip_file): abort(500, "No tarball to serve. Looked at '%s'." % local_gzip_file) return send_from_directory(CACHE_DIR, gzip_filename) @app.route("/<targets>_<inp>_<params>_<started_at>_<stopped_at>.nc") def download_targets_netcdf(targets, inp, params, started_at, stopped_at): """ NOTE : This is not used anymore. Grab data and orbit data for the specified `target`, rearrange it and return it as a NetCDF file. `started_at` and `stopped_at` are expected to be UTC. targets: string list of targets' slugs, separated by `-`. params: string list of targets' parameters, separated by `-`. """ separator = '-' # /!\ this char should never be in target's slugs targets = targets.split(separator) targets.sort() targets_configs = [] for target in targets: if not target: abort(400, "Invalid targets format : `%s`." % targets) targets_configs.append(get_target_config(target)) if 0 == len(targets_configs): abort(400, "No valid targets specified. What are you doing?") params = params.split(separator) params.sort() if 0 == len(params): abort(400, "No valid parameters specified. What are you doing?") if not is_list_in_list(params, PARAMETERS.keys()): abort(400, "Some parameters are not recognized in '%s'." % str(params)) date_fmt = FILE_DATE_FMT try: started_at = datetime.datetime.strptime(started_at, date_fmt) except: abort(400, "Invalid started_at parameter : '%s'." % started_at) try: stopped_at = datetime.datetime.strptime(stopped_at, date_fmt) except: abort(400, "Invalid stopped_at parameter : '%s'." % stopped_at) sta = started_at.strftime(date_fmt) sto = stopped_at.strftime(date_fmt) input_slug = get_input_slug_from_query(inp=inp) nc_filename = "%s_%s_%s_%s_%s.nc" % ( separator.join(targets), separator.join(params), input_slug, sta, sto ) nc_path = join(CACHE_DIR, nc_filename) if not isfile(nc_path): log.debug("Creating the NetCDF file '%s'..." % nc_filename) nc_handle = Dataset(nc_path, "w", format="NETCDF4") try: nc_handle.description = "Model and orbit data for targets" # todo nc_handle.history = "Created " + time.ctime(time.time()) nc_handle.source = "Heliopropa (CDDP)" available_params = list(PROPERTIES) for target in targets_configs: target_slug = target['slug'] log.debug("Adding group '%s' to the NetCDF..." % target_slug) nc_group = nc_handle.createGroup(target_slug) data = get_data_for_target( target_config=target, input_slug=input_slug, started_at=started_at, stopped_at=stopped_at ) dkeys = sorted(data) dimension = 'dim_'+target_slug nc_handle.createDimension(dimension, len(dkeys)) # TIME # nc_time = nc_group.createVariable('time', 'i8', (dimension,)) nc_time.units = "hours since 1970-01-01 00:00:00" nc_time.calendar = "standard" times = [] for dkey in dkeys: time_as_string = data[dkey][0][:-6] # remove +00:00 tail date = datetime.datetime.strptime(time_as_string, date_fmt) times.append(date2num( date, units=nc_time.units, calendar=nc_time.calendar )) nc_time[:] = times # SELECTED PARAMETERS # nc_vars = [] indices = [] for param in params: indices.append(available_params.index(param)) nc_var = nc_group.createVariable(param, 'f8', (dimension,)) nc_var.units = PARAMETERS[param]['units'] nc_vars.append(nc_var) for i, nc_var in enumerate(nc_vars): index = indices[i] values = [] for dkey in dkeys: dval = data[dkey] values.append(dval[index]) nc_var[:] = values # ORBIT # nc_x = nc_group.createVariable('xhee', 'f8', (dimension,)) nc_x.units = 'Au' nc_y = nc_group.createVariable('yhee', 'f8', (dimension,)) nc_y.units = 'Au' values_x = [] values_y = [] index_x = available_params.index('xhee') index_y = available_params.index('yhee') for dkey in dkeys: dval = data[dkey] values_x.append(dval[index_x]) values_y.append(dval[index_y]) nc_x[:] = values_x nc_y[:] = values_y log.debug("Writing NetCDF '%s'..." % nc_filename) except Exception: log.error("Failed to generate NetCDF '%s'." % nc_filename) raise finally: nc_handle.close() if not isfile(nc_path): abort(500, "No NetCDF to serve. Looked at '%s'." % nc_path) return send_from_directory(CACHE_DIR, nc_filename) @app.route("/<targets>_<inp>_<started_at>_<stopped_at>.cdf") def download_targets_cdf(targets, inp, started_at, stopped_at): """ Grab data and orbit data for the specified `target`, rearrange it and return it as a CDF file. `started_at` and `stopped_at` are expected to be UTC. targets: string list of targets' slugs, separated by `-`. params: string list of targets' parameters, separated by `-`. """ separator = '-' # /!\ this char should never be in target's slugs targets = targets.split(separator) targets.sort() targets_configs = [] for target in targets: if not target: abort(400, "Invalid targets format : `%s`." % targets) targets_configs.append(get_target_config(target)) if 0 == len(targets_configs): abort(400, "No valid targets specified. What are you doing?") params = PARAMETERS.keys() try: started_at = datetime.datetime.strptime(started_at, FILE_DATE_FMT) except: abort(400, "Invalid started_at parameter : '%s'." % started_at) try: stopped_at = datetime.datetime.strptime(stopped_at, FILE_DATE_FMT) except: abort(400, "Invalid stopped_at parameter : '%s'." % stopped_at) sta = started_at.strftime(FILE_DATE_FMT) sto = stopped_at.strftime(FILE_DATE_FMT) input_slug = get_input_slug_from_query(inp=inp) cdf_filename = "%s_%s_%s_%s.cdf" % ( separator.join(targets), input_slug, sta, sto ) cdf_path = join(CACHE_DIR, cdf_filename) if not isfile(cdf_path): log.debug("Creating the CDF file '%s'..." % cdf_filename) try: from spacepy import pycdf except ImportError: # If spacepy's toolbox is not up-to-date, importing will fail. # So, let's update and try again ! update_spacepy() try: from spacepy import pycdf except ImportError as e: log.error("Failed to import pycdf from spacepy : %s" % e) raise except Exception as e: log.error("Failed to import pycdf from spacepy : %s" % e) raise try: cdf_handle = pycdf.CDF(cdf_path, masterpath='') targets_names = ', '.join([t['name'] for t in targets_configs]) description = "Model and orbit data for %s." % targets_names cdf_handle.attrs['Description'] = description cdf_handle.attrs['Author'] = "Heliopropa.irap.omp.eu (CDPP)" cdf_handle.attrs['Created'] = str(time.ctime(time.time())) # fixme: Try changing the name from 00 to something relevant cdf_handle.attrs['Title'] = "Heliopropa - %s" % targets_names ####### available_params = list(PROPERTIES) for target in targets_configs: target_slug = target['slug'] data = get_data_for_target( target_config=target, input_slug=input_slug, started_at=started_at, stopped_at=stopped_at ) dkeys = sorted(data) values = [] for dkey in dkeys: time_str = data[dkey][0][:-6] # remove +00:00 tail date = datetime.datetime.strptime(time_str, FILE_DATE_FMT) values.append(date) kt = "%s_time" % target_slug cdf_handle.new(kt, type=pycdf.const.CDF_EPOCH) cdf_handle[kt] = values # cdf_handle[kt].attrs['FIELDNAM'] = "Time since 0 A.D" for param in params: k = "%s_%s" % (target_slug, param) # print("PARAM %s" % k) values = [] i = available_params.index(param) has_nones = False for dkey in dkeys: value = data[dkey][i] if value is None: has_nones = True values.append(value) if has_nones: # PyCDF hates it when there are Nones. # Since we don't know what value to set instead, # let's skip the param altogether. continue cdf_handle[k] = values attrs = cdf_handle[k].attrs attrs['UNITS'] = PARAMETERS[param]['units'] attrs['LABLAXIS'] = PARAMETERS[param]['name'] attrs['FIELDNAM'] = PARAMETERS[param]['title'] if values: attrs['VALIDMIN'] = min(values) attrs['VALIDMAX'] = max(values) kx = "%s_xhee" % target_slug ky = "%s_yhee" % target_slug values_xhee = [] values_yhee = [] index_x = available_params.index('xhee') index_y = available_params.index('yhee') for dkey in dkeys: value_xhee = data[dkey][index_x] value_yhee = data[dkey][index_y] # We've got some `None`s cropping up in the data sometimes. # PyCDF does not digest Nones at all. # While they solve this upstream, let's make an ugly fix! if (value_xhee is not None) and (value_yhee is not None): values_xhee.append(value_xhee) values_yhee.append(value_yhee) else: values_xhee.append(0) values_yhee.append(0) log.warn("Orbit data for %s has NaNs." % target_slug) cdf_handle[kx] = values_xhee cdf_handle[ky] = values_yhee cdf_handle[kx].attrs['UNITS'] = 'Au' cdf_handle[ky].attrs['UNITS'] = 'Au' log.debug("Writing CDF '%s'..." % cdf_filename) cdf_handle.close() log.debug("Wrote CDF '%s'." % cdf_filename) except Exception as e: log.error("Failed to generate CDF '%s'." % cdf_filename) if isfile(cdf_path): removefile(cdf_path) raise if not isfile(cdf_path): abort(500, "No CDF to serve. Looked at '%s'." % cdf_path) return send_from_directory(CACHE_DIR, cdf_filename) @app.route("/<target>_auroral_catalog.csv") def download_auroral_catalog_csv(target): tc = validate_tap_target_config(target) log.debug("Requesting auroral emissions CSV for %s..." % tc['name']) filename = "%s_auroral_catalog.csv" % (target) local_csv_file = join(CACHE_DIR, filename) target_name = tc['tap']['target_name'] emissions = retrieve_auroral_emissions(target_name) # Be careful with regexes in python 2 ; best always use the ^$ thumbnail_url_filter = re.compile("^.*proc(?:_small)?\\.(?:jpe?g|png|webp|gif|bmp|tiff)$") # Filter the emissions def _keep_emission(emission): ok = thumbnail_url_filter.match(emission['thumbnail_url']) # print("ok", ok, emission['thumbnail_url']) return bool(ok) emissions = [e for e in emissions if _keep_emission(e)] header = ('time_min', 'time_max', 'thumbnail_url', 'external_link') if len(emissions): header = emissions[0].keys() si = StringIO.StringIO() cw = csv_dict_writer(si, fieldnames=header) cw.writeheader() # 'time_min', 'time_max', 'thumbnail_url', 'external_link' #cw.writerow(head) log.debug("Writing auroral emissions CSV for %s..." % tc['name']) cw.writerows(emissions) log.info("Generated auroral emissions CSV contents for %s." % tc['name']) return si.getvalue() # if not isfile(local_csv_file): # abort(500, "Could not cache CSV file at '%s'." % local_csv_file) # # return send_from_directory(CACHE_DIR, filename) @app.route("/test/auroral/<target>") def test_auroral_emissions(target): tc = validate_tap_target_config(target) target_name = tc['tap']['target_name'] retrieved = retrieve_auroral_emissions(target_name) return "%d results:\n%s" % (len(retrieved), str(retrieved)) # API ######################################################################### @app.route("/cache/clear") def cache_clear(): """ Removes all files from the cache. Note: It also removes the .gitkeep file. Not a problem for prod. """ removed_files = remove_all_files(CACHE_DIR) count = len(removed_files) return "Cache cleared! Removed %d file%s." \ % (count, 's' if count != 1 else '') @app.route("/cache/cleanup") def cache_cleanup(): """ Removes all files from the cache that are older than roughly one month. Note: It also removes the .gitkeep file. Maybe it should not, but hey. """ a_month_ago = datetime.datetime.now() - datetime.timedelta(days=32) removed_files = remove_files_created_before(a_month_ago, CACHE_DIR) count = len(removed_files) return "Cache cleaned! Removed %d old file%s." \ % (count, 's' if count != 1 else '') @app.route("/cache/warmup") def cache_warmup(): """ Warms up the cache for the current day. """ warmup_started_at = datetime.datetime.now() sta, sto = get_interval_from_query() inp = 'l1' # default input, maybe warm them all up ? targets = get_active_targets() targets_slugs = [target['slug'] for target in targets] targets_slugs.sort() update_spacepy() for target in targets: download_target_csv(target['slug'], inp, sta, sto) download_targets_cdf('-'.join(targets_slugs), inp, sta, sto) warmup_ended_at = datetime.datetime.now() warmup_timedelta = warmup_ended_at - warmup_started_at return "Done in %s." % str(warmup_timedelta) @app.route("/log") @app.route("/log.html") def log_show(): with open(LOG_FILE, 'r') as f: contents = f.read() return "<pre>" + contents + "</pre>" @app.route("/log/clear") def log_clear(): with open(LOG_FILE, 'w') as f: f.truncate() return "Log cleared successfully." # DEV TOOLS ################################################################### # @app.route("/inspect") # def analyze_cdf(): # """ # For debug purposes. # """ # cdf_to_inspect = get_path("../res/dummy.nc") # cdf_to_inspect = get_path("../res/dummy_jupiter_coordinates.nc") # # si = StringIO.StringIO() # cw = csv.DictWriter(si, fieldnames=['Name', 'Shape', 'Length']) # cw.writeheader() # # # Time, StartTime, StopTime, V, B, N, T, Delta_angle, P_dyn, QualityFlag # cdf_handle = Dataset(cdf_to_inspect, "r", format="NETCDF4") # for variable in cdf_handle.variables: # v = cdf_handle.variables[variable] # cw.writerow({ # 'Name': variable, # 'Shape': v.shape, # 'Length': v.size, # }) # cdf_handle.close() # # return si.getvalue() # MAIN ######################################################################## if __name__ == "__main__": # Debug mode is on, as the production server does not use this but run.wsgi extra_files = [get_path('../config.yml')] app.run(debug=True, extra_files=extra_files)