Source code for ocbpy.instruments.supermag

# -*- coding: utf-8 -*-
# Copyright (C) 2017 AGB
# Full license can be found in LICENSE.txt
# ---------------------------------------------------------------------------
"""Perform OCB gridding for SuperMAG data.

Notes
-----
SuperMAG data available at: http://supermag.jhuapl.edu/

"""

import datetime as dt
import numpy as np
import warnings

import ocbpy
import ocbpy.ocb_scaling as ocbscal


[docs]def supermag2ascii_ocb(smagfile, outfile, hemisphere=0, ocb=None, ocbfile='default', instrument='', max_sdiff=600, min_merit=None, max_merit=None, scale_func=ocbscal.normal_curl_evar, **kwargs): """Covert and scales the SuperMAG data into OCB coordinates. Parameters ---------- smagfile : str File containing the required SuperMAG file sorted by time outfile : str Filename for the output data hemisphere : int Hemisphere to process (can only do one at a time). 1=Northern, -1=Southern, 0=Determine from data (default=0) ocb : ocbpy.OCBoundary, ocbpy.DualBoundary, or NoneType OCBoundary or DualBoundary object with data loaded already. If None, looks to `ocbfile` and creates an OCBoundary object. (default=None) ocbfile : str File containing the required OC Boundary data sorted by time, or 'default' to load default file for time and hemisphere. Only used if no OCBoundary object is supplied (default='default') instrument : str Instrument providing the OCBoundaries. Requires 'image' or 'ampere' if a file is provided. If using filename='default', also accepts 'amp', 'si12', 'si13', 'wic', and ''. (default='') max_sdiff : int Maximum seconds between OCB and data record in sec (default=60) min_merit : float or NoneType Minimum value for the default figure of merit or None to not apply a custom minimum (default=None) max_merit : float or NoneTye Maximum value for the default figure of merit or None to not apply a custom maximum (default=None) kwargs : dict Dict with optional selection criteria. The key should correspond to a data attribute and the value must be a tuple with the first value specifying 'max', 'min', 'maxeq', 'mineq', or 'equal' and the second value specifying the value to use in the comparison. min_sectors : int Minimum number of MLT sectors required for good OCB. Deprecated, will be removed in version 0.3.1+ (default=7). rcent_dev : float Maximum number of degrees between the new centre and the AACGM pole. Deprecated, will be removed in version 0.3.1+ (default=8.0) max_r : float Maximum radius for open-closed field line boundary in degrees/ Deprecated, will be removed in version 0.3.1+ (default=23.0) min_r : float Minimum radius for open-closed field line boundary in degrees. Deprecated, will be removed in version 0.3.1+ (default=10.0) scale_func : function or NoneType Scale the magnetic field observations unless None (default=ocbpy.ocb_scale.normal_curl_evar) Raises ------ IOError If unable to open the input or output file Notes ----- May only process one hemisphere at a time. See Also -------- ocbpy.ocb_scale.normal_curl_evar """ # Test inputs if not ocbpy.instruments.test_file(smagfile): raise IOError("SuperMAG file cannot be opened [{:s}]".format(smagfile)) if not isinstance(outfile, str): raise IOError("output filename is not a string [{:}]".format(outfile)) # Read the superMAG data and calculate the magnetic field magnitude header, mdata = load_supermag_ascii_data(smagfile) # Load the OCB data for the SuperMAG data period if ocb is None or (not isinstance(ocb, ocbpy.OCBoundary) and not isinstance(ocb, ocbpy.DualBoundary)): mstart = mdata['DATETIME'][0] - dt.timedelta(seconds=max_sdiff + 1) mend = mdata['DATETIME'][-1] + dt.timedelta(seconds=max_sdiff + 1) # If hemisphere isn't specified, set it here if hemisphere == 0: hemisphere = np.sign(np.nanmax(mdata['MLAT'])) # Ensure that all data is in the same hemisphere if hemisphere == 0: hemisphere = np.sign(np.nanmin(mdata['MLAT'])) elif hemisphere != np.sign(np.nanmin(mdata['MLAT'])): raise ValueError("".join(["cannot process observations from " "both hemispheres at the same time;" "set hemisphere=+/-1 to choose."])) # Initialize the OCBoundary object ocb = ocbpy.OCBoundary(ocbfile, stime=mstart, etime=mend, hemisphere=hemisphere, instrument=instrument) elif hemisphere == 0: # If the OCBoundary object is specified and hemisphere isn't use # the OCBoundary object to specify the hemisphere hemisphere = ocb.hemisphere # Test the OCB data if ocb.records == 0: ocbpy.logger.error("no data in the Boundary file(s)") return # Add check for deprecated and custom kwargs dep_comp = {'min_sectors': ['num_sectors', ('mineq', 7)], 'rcent_dev': ['r_cent', ('maxeq', 8.0)], 'max_r': ['r', ('maxeq', 23.0)], 'min_r': ['r', ('mineq', 10.0)]} cust_keys = list(kwargs.keys()) for ckey in cust_keys: if ckey in dep_comp.keys(): warnings.warn("".join(["Deprecated kwarg will be removed in ", "version 0.3.1+. To replecate behaviour", ", use {", dep_comp[ckey][0], ": ", repr(dep_comp[ckey][1]), "}"]), DeprecationWarning, stacklevel=2) del kwargs[ckey] if hasattr(ocb, dep_comp[ckey][0]): kwargs[dep_comp[ckey][0]] = dep_comp[ckey][1] # Remove the data with NaNs/Inf and from the opposite hemisphere/equator igood = np.where((np.isfinite(mdata['MLT'])) & (np.isfinite(mdata['MLAT'])) & (np.isfinite(mdata['BE'])) & (np.isfinite(mdata['BN'])) & (np.isfinite(mdata['BZ'])) & (np.sign(mdata['MLAT']) == hemisphere))[0] if igood.shape != mdata['MLT'].shape: for mkey in mdata.keys(): mdata[mkey] = mdata[mkey][igood] # Open and test the file to ensure it can be written with open(outfile, 'w') as fout: # Write the output line outline = "#DATE TIME NST STID " optional_keys = ["SML", "SMU", "SZA"] for okey in optional_keys: if okey in mdata.keys(): outline = "{:s}{:s} ".format(outline, okey) outline = "".join([outline, "MLAT MLT BMAG BN BE BZ OCB_MLAT OCB_MLT ", "OCB_BMAG OCB_BN OCB_BE OCB_BZ\n"]) fout.write(outline) # Initialise the ocb and SuperMAG indices imag = 0 nmag = mdata['DATETIME'].shape[0] # Cycle through the data, matching SuperMAG and OCB records while imag < nmag and ocb.rec_ind < ocb.records: imag = ocbpy.match_data_ocb(ocb, mdata['DATETIME'], idat=imag, max_tol=max_sdiff, min_merit=min_merit, max_merit=max_merit, **kwargs) if imag < nmag and ocb.rec_ind < ocb.records: # Get all of the points for this time pairing itime = np.where(mdata['DATETIME'] == mdata['DATETIME'][imag]) # Set this value's AACGM vector values vdata = ocbscal.VectorData( itime[0], ocb.rec_ind, mdata['MLAT'][itime], mdata['MLT'][itime], aacgm_n=mdata['BN'][itime], aacgm_e=mdata['BE'][itime], aacgm_z=mdata['BZ'][itime], scale_func=scale_func) vdata.set_ocb(ocb) # Output one line for each time for tind, jmag in enumerate(itime[0]): # Format the output line: # DATE TIME NST [SML SMU] STID [SZA] MLAT MLT BMAG BN BE # BZ OCB_MLAT OCB_MLT OCB_BMAG OCB_BN OCB_BE OCB_BZ # Recall that NST is the number of stations at this time, # so output the number of indices to be output at this time. outline = "{:} {:d} {:s} ".format(mdata['DATETIME'][jmag], len(itime[0]), mdata['STID'][jmag]) for okey in optional_keys: if okey == "SZA": outline = "{:s}{:.2f} ".format(outline, mdata[okey][jmag]) else: outline = "{:s}{:d} ".format(outline, mdata[okey][jmag]) outline = "".join([ outline, "{:.2f} ".format(vdata.aacgm_lat[tind]), "{:.2f} {:.2f} {:.2f} {:.2f} {:.2f} {:.2f} ".format( vdata.aacgm_mlt[tind], vdata.aacgm_mag[tind], vdata.aacgm_n[tind], vdata.aacgm_e[tind], vdata.aacgm_z[tind], vdata.ocb_lat[tind]), "{:.2f} {:.2f} {:.2f} {:.2f} {:.2f}\n".format( vdata.ocb_mlt[tind], vdata.ocb_mag[tind], vdata.ocb_n[tind], vdata.ocb_e[tind], vdata.ocb_z[tind])]) fout.write(outline) # Move to next line imag = itime[0][-1] + 1 return
[docs]def load_supermag_ascii_data(filename): """Load a SuperMAG ASCII data file. Parameters ---------- filename : str SuperMAG ASCI data file name Returns ------- out : dict The dict keys are specified by the header data line, the data for each key are stored in the numpy array """ fill_val = 999999 header = list() ind = {"SMU": fill_val, "SML": fill_val} out = {"YEAR": list(), "MONTH": list(), "DAY": list(), "HOUR": list(), "MIN": list(), "SEC": list(), "DATETIME": list(), "NST": list(), "SML": list(), "SMU": list(), "STID": list(), "BN": list(), "BE": list(), "BZ": list(), "MLT": list(), "MLAT": list(), "DEC": list(), "SZA": list()} if not ocbpy.instruments.test_file(filename): return header, dict() # Open the datafile and read the data with open(filename, "r") as f: hflag = True n = -1 for line in f.readlines(): if hflag: # Fill the header list header.append(line) if line.find("=========================================") >= 0: hflag = False else: # Fill the output dictionary if n < 0: # This is a date line n = 0 lsplit = np.array(line.split(), dtype=int) dtime = dt.datetime(lsplit[0], lsplit[1], lsplit[2], lsplit[3], lsplit[4], lsplit[5]) snum = lsplit[-1] else: lsplit = line.split() if len(lsplit) == 2: # This is an index line ind[lsplit[0]] = int(lsplit[1]) else: # This is a station data line out['YEAR'].append(dtime.year) out['MONTH'].append(dtime.month) out['DAY'].append(dtime.day) out['HOUR'].append(dtime.hour) out['MIN'].append(dtime.minute) out['SEC'].append(dtime.second) out['DATETIME'].append(dtime) out['NST'].append(snum) for k in ind.keys(): out[k].append(ind[k]) out['STID'].append(lsplit[0]) out['BN'].append(float(lsplit[1])) out['BE'].append(float(lsplit[2])) out['BZ'].append(float(lsplit[3])) out['MLT'].append(float(lsplit[4])) out['MLAT'].append(float(lsplit[5])) out['DEC'].append(float(lsplit[6])) out['SZA'].append(float(lsplit[7])) n += 1 if n == snum: n = -1 ind = {"SMU": fill_val, "SML": fill_val} # Recast data as numpy arrays and replace fill value with np.nan for k in out: if k == "STID": out[k] = np.array(out[k], dtype=str) else: out[k] = np.array(out[k]) if k in ['BE', 'BN', 'DEC', 'SZA', 'MLT', 'BZ']: out[k][out[k] == fill_val] = np.nan return header, out