Source code for virtualargofleet.utilities

import collections
import warnings
import numpy as np
import xarray as xr
from tqdm import tqdm
import concurrent.futures
import multiprocessing
import os
import pandas as pd
import logging
import json
import urllib.request
from string import Formatter
import platform
import socket
import psutil
from packaging import version
from typing import List, Dict, Union, TextIO
import jsonschema
from referencing import Registry, Resource
from jsonschema import Draft202012Validator
from pathlib import Path


log = logging.getLogger("virtualfleet.utils")
path2data = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'assets')
path2schemas = os.path.sep.join([os.path.dirname(os.path.abspath(__file__)), '..', 'schemas'])


class VFschema:
    """A base class to export json files following a schema"""
    schema_root: str = "https://raw.githubusercontent.com/euroargodev/VirtualFleet/json-schemas-FloatConfiguration/schemas"

    def __init__(self, **kwargs):
        for key in self.required:
            if key not in kwargs:
                raise ValueError("Missing '%s' property" % key)
        for key in kwargs:
            if key in self.properties:
                setattr(self, key, kwargs[key])

    def __repr__(self):
        name = self.__class__.__name__
        summary = []
        for p in self.properties:
            if p != 'description':
                summary.append("%s=%s" % (p, getattr(self, p)))
        if hasattr(self, 'description'):
            summary.append("%s='%s'" % ('description', getattr(self, 'description')))

        return "%s(%s)" % (name, ", ".join(summary))

    def _repr_html_(self):
        return self.__repr__()

    class JSONEncoder(json.JSONEncoder):
        def default(self, obj):
            if isinstance(obj, pd._libs.tslibs.nattype.NaTType):
                return None
            if isinstance(obj, pd.Timestamp):
                return obj.isoformat()
            if isinstance(obj, pd.Timedelta):
                return obj.isoformat()
            if isinstance(obj, (VFschema_meta, VFschema_parameter, VFschema_configuration)):
                return obj.__dict__
            # 👇️ otherwise use the default behavior
            return json.JSONEncoder.default(self, obj)

    @property
    def __dict__(self):
        d = {}
        for key in self.properties:
            value = getattr(self, key)
            d.update({key: value})
        return d

    def to_json(self, fp: Union[str, Path, TextIO] = None, indent=4):
        """Save to JSON file or return a JSON string that can be loaded with json.loads()"""
        jsdata = self.__dict__
        if hasattr(self, 'schema'):
            jsdata.update({"$schema": "%s/%s.json" % (self.schema_root, getattr(self, 'schema'))})
        if fp is None:
            return json.dumps(jsdata, indent=indent, cls=self.JSONEncoder)
        else:
            if hasattr(fp, 'write'):
                return json.dump(jsdata, fp, indent=indent, cls=self.JSONEncoder)
            else:
                if isinstance(fp, str):
                    fp = Path(fp)

                with fp.open('w') as fpp:
                    o = json.dump(jsdata, fpp, indent=indent, cls=self.JSONEncoder)
                return o

    @staticmethod
    def validate(data, schema) -> Union[bool, List]:
        # Read schema and create validator:
        schema = json.loads(Path(schema).read_text())
        res = Resource.from_contents(schema)
        registry = Registry(retrieve = res)
        validator = jsonschema.Draft202012Validator(schema, registry=registry)

        # Read data and validate against schema:
        data = json.loads(Path(data).read_text())
        # return validator.validate(data)
        try:
            validator.validate(data)
        except jsonschema.exceptions.ValidationError:
            pass
        except jsonschema.exceptions.SchemaError as error:
            log.debug("SchemaError")
            raise
        except jsonschema.exceptions.UnknownType as error:
            log.debug("UnknownType")
            raise
        except jsonschema.exceptions.UndefinedTypeCheck as error:
            log.debug("UndefinedTypeCheck")
            raise

        errors = list(validator.iter_errors(data))
        return True if len(errors) == 0 else errors


class VFschema_meta(VFschema):
    """JSON schema handler for meta-data of a :class:`ConfigParam` instance"""
    unit: str
    dtype: str
    techkey: str

    description: str = "Meta-data of a configuration parameter"
    properties: List = ["unit", "dtype", "techkey"]
    required: List = []

    @staticmethod
    def from_dict(obj: Dict) -> 'VFschema_meta':
        return VFschema_meta(**obj)


class VFschema_parameter(VFschema):
    """JSON schema handler for a :class:`ConfigParam` instance"""
    name: str
    value: Union[str, float]
    meta: VFschema_meta
    description: str = "One configuration parameter"

    properties: List = ["name", "value", "description", "meta"]
    required: List = ["name", "value"]

    def __init__(self, **kwargs):
        if 'meta' in kwargs and isinstance(kwargs['meta'], dict):
            kwargs['meta'] = VFschema_meta.from_dict(kwargs['meta'])
        super().__init__(**kwargs)

    @staticmethod
    def from_dict(obj: Dict) -> 'VFschema_parameter':
        return VFschema_parameter(**obj)


class VFschema_configuration(VFschema):
    """JSON schema handler for a :class:`FloatConfiguration` instance"""
    version: str
    name: str
    parameters: List[VFschema_parameter]
    created: pd.Timestamp = None

    schema: str = "VF-ArgoFloat-Configuration"
    description: str = "VirtualFleet Argo Float configuration"
    properties: List = ["created", "version", "name", "parameters"]
    required: List = ["created", "version", "name", "parameters"]

    def __init__(self, **kwargs):
        if 'created' not in kwargs or kwargs['created'] is None:
            kwargs['created'] = pd.to_datetime('now', utc=True)
        if 'parameters' in kwargs:
            parameters = []
            for param in kwargs['parameters']:
                if isinstance(param, dict):
                    parameters.append(VFschema_parameter.from_dict(param))
                else:
                    parameters.append(param)
            kwargs['parameters'] = parameters
        super().__init__(**kwargs)

    @staticmethod
    def from_dict(obj: Dict) -> 'VFschema_configuration':
        return VFschema_configuration(**obj)


class ConfigParam:
    """Configuration parameter manager

    Create a key/value pair object with metadata description, unit and dtype.

    Examples
    --------
    >>> p = ConfigParam(key='profile_depth', value=2000.)
    >>> p = ConfigParam(key='profile_depth', value=2000., unit='m', description='Maximum profile depth', dtype=float)
    >>> p.value

    """
    str_val = lambda s, v: ("%s" % v).split("'")[1] if isinstance(v, type) else str(v)

    def __init__(self, key, value, **kwargs):
        self.key = key
        default_meta = {'description': '', 'unit': '', 'dtype': '', 'techkey': ''}
        self.meta = {**default_meta, **kwargs}
        self.set_value(value)

    def __repr__(self):
        desc = "" if self.meta['description'] == '' else "(%s)" % self.meta['description']
        unit = "" if self.meta['unit'] == '' else "[%s]" % self.meta['unit']
        return "%s %s: %s %s" % (self.key, desc, self.value, unit)

    def get_value(self):
        return self._value

    def set_value(self, value):
        if self.meta['dtype'] != '':
            try:
                value = self.meta['dtype'](value)
            except ValueError:
                raise ValueError("Cannot cast '%s' value as expected %s" % (self.key, self.str_val(self.meta['dtype'])))
        self._value = value

    @property
    def json_schema(self):
        return VFschema_parameter.from_dict({
            'name': self.key,
            'value': self.value,
            'description': self.meta['description'],
            'meta': {
                    'unit': self.meta['unit'],
                    'dtype': self.str_val(self.meta['dtype']),
                    'techkey': self.meta['techkey']}
        })

    def to_json(self, *args, **kwargs):
        """Return a dictionary serialisable in json or write to file"""
        return self.json_schema.to_json(*args, **kwargs)

    value = property(get_value, set_value)


[docs]class FloatConfiguration: """Float mission configuration manager Create a default configuration and then possibly update parameter values Can be used to create a virtual fleet, to save or load float configurations Examples -------- >>> cfg = FloatConfiguration('default') # Internally defined >>> cfg = FloatConfiguration('local-change') # Internally defined >>> cfg = FloatConfiguration('cfg_file.json') # From any json file >>> cfg = FloatConfiguration([6902919, 132]) # From Euro-Argo Fleet API >>> cfg.update('parking_depth', 500) # Update one parameter value >>> cfg.params # Return the list of parameters >>> cfg.mission # Return the configuration as a dictionary >>> cfg.tech # Return the configuration as a dictionary using Argo technical keys >>> cfg.to_json("cfg_file.json") # Save to file for later re-use >>> cfg """
[docs] def __init__(self, name: Union[str, list] = 'default', *args, **kwargs): """ Parameters ---------- name: str Name of the configuration to load """ self._params_dict = {} def load_from_json_v1(name): # Load configuration from file with open(name, "r") as f: js = json.load(f) if js['version'] != "1.0": raise ValueError("This file is not with format 1.0 version: '%s'" % js['version']) name = js['name'] data = js['data'] for key in data.keys(): value = data[key]['value'] meta = data[key]['meta'] meta['dtype'] = eval(meta['dtype']) self.params = ConfigParam(key=key, value=value, **meta) return name, data def load_from_json_v2(name): # Load configuration from file: with open(name, "r") as f: js = json.load(f) if js['version'] != "2.0": raise ValueError("This file is not with format 2.0 version: '%s'" % js['version']) # Validate json against schema: # json_schema = Path(os.path.join(path2schemas, 'VF-ArgoFloat-Configuration.json')).read_text() json_schema = os.path.join(path2schemas, 'VF-ArgoFloat-Configuration.json') errors = VFschema_configuration.validate(name, json_schema) if isinstance(errors, list): log.debug(list) raise jsonschema.exceptions.ValidationError("This Float configuration file is not valid against format version 2.0\n%s" % str(errors)) # Load to FloatConfiguration instance: name = js['name'] parameters = js['parameters'] for param_obj in parameters: key = param_obj['name'] value = param_obj['value'] meta = param_obj['meta'] meta['description'] = param_obj['description'] meta['dtype'] = eval(meta['dtype']) self.params = ConfigParam(key=key, value=value, **meta) return name, parameters def load_from_json(name): # Load configuration from file with open(name, "r") as f: js = json.load(f) if js['version'] == "1.0": warnings.warn("There is a newer json file format '2.0' for Argo float configuration available, please re-save this configuration, it will automatically be updated to the new format.") return load_from_json_v1(name) elif js['version'] == "2.0": return load_from_json_v2(name) else: raise ValueError("Unsupported file format version '%s'" % js['version']) if name == 'default': name, data = load_from_json(os.path.join(path2data, 'FloatConfiguration_default.json')) elif name == 'local-change': name, data = load_from_json(os.path.join(path2data, 'FloatConfiguration_local_change.json')) elif name == 'gse-experiment' or name == "gulf-stream": name, data = load_from_json(os.path.join(path2data, 'FloatConfiguration_gulf_stream.json')) elif isinstance(name, str) and os.path.splitext(name)[-1] == ".json": name, data = load_from_json(name) elif isinstance(name, list): # Load default configuration: load_from_json(os.path.join(path2data, 'FloatConfiguration_default.json')) # Load configuration of one float cycle from EA API wmo = name[0] cyc = name[1] name = "Float %i - Cycle %i" % (wmo, cyc) df = get_float_config(wmo, cyc) di = {'CONFIG_ProfilePressure_dbar': 'profile_depth', 'CONFIG_ParkPressure_dbar': 'parking_depth', 'CONFIG_CycleTime_hours': 'cycle_duration', 'CONFIG_MaxCycles_NUMBER': 'life_expectancy', 'CONFIG_AscentSpeed_mm/s': 'vertical_speed', } # Over-write known parameters: for code in di.keys(): if code in df: self.update(di[code], df[code].iloc[0]) if code == 'CONFIG_AscentSpeed_mm/s': self.update(di[code], df[code].iloc[0]/1000) # Convert mm/s to m/s else: msg = "%s not found for this profile, fall back on default value: %s" % \ (code, self._params_dict[di[code]]) # log.warning(msg) warnings.warn(msg) else: raise ValueError("Please give me a known configuration name ('default', 'local-change', 'gulf-stream') or a json file to load from !") self.name = name
def __repr__(self): summary = ["<FloatConfiguration><%s>" % self.name] for p in self._params_dict.keys(): summary.append("- %s" % str(self._params_dict[p])) return "\n".join(summary) @property def params(self): """List of parameter keys""" return sorted([self._params_dict[p].key for p in self._params_dict.keys()]) @params.setter def params(self, param): if not isinstance(param, ConfigParam): raise ValueError("param must be a 'ConfigParam' instance") if param.key not in self.params: self._params_dict[param.key] = param self._params_dict = collections.OrderedDict(sorted(self._params_dict.items()))
[docs] def update(self, key: str, new_value): """Update value to an existing parameter Parameters ---------- key: str Name of the parameter to update new_value: New value to attribute to this parameter """ if key not in self.params: raise ValueError("Invalid parameter '%s' for configuration '%s'" % (key, self.name)) self._params_dict[key].value = new_value return self
@property def mission(self): """Return the float configuration as a dictionary to be used by a :class:`VirtualFleet`""" mission = {} for key in self._params_dict.keys(): mission[key] = self._params_dict[key].value return mission @property def tech(self): """Float configuration as a dictionary using Argo technical keys""" mission = {} for key in self._params_dict.keys(): if self._params_dict[key].meta['techkey'] != '': techkey = self._params_dict[key].meta['techkey'] mission[techkey] = self._params_dict[key].value return mission @property def json_schema(self): parameters = [self._params_dict[key].json_schema for key in self._params_dict] return VFschema_configuration.from_dict({ 'name': self.name, 'version': '2.0', 'parameters': parameters })
[docs] def to_json(self, *args, **kwargs): """Return a dictionary serialisable in json or write to file""" return self.json_schema.to_json(*args, **kwargs)
class SimulationSet: """Convenient class to manage a collection of simulations meta-data This class is used by VirtualFleet instances to keep track of all calls to the 'simulate' method. In the future, this could be used to computed aggregated statistics from several simulations. Examples -------- >>> s = SimulationSet() >>> s.add({'execution_time': 1.8, 'platform': 'Darwin', 'output_path': 'trajectories_part1.zarr'}) >>> s.add({'execution_time': 2.4, 'platform': 'Darwin', 'output_path': 'trajectories_part2.zarr'}) >>> s.N >>> s.last """ def __init__(self): self.simulated = False self.runs = [] def __repr__(self): summary = ["<VirtualFleet.SimulationSet>"] summary.append("Executed: %s" % self.simulated) summary.append("Number of simulation(s): %i" % self.N) return "\n".join(summary) def add(self, params): """Add a new set of parameters to the simulation set""" self.simulated = True self.runs.append(params) return self @property def N(self): """Return the number of simulations in the set""" return len(self.runs) @property def last(self): """Return meta-data from the last simulation in the set""" return self.runs[-1] def get_splitdates(t, N = 1): """Given a list of dates, return index of dates before a date change larger than N days""" dt = np.diff(t).astype('timedelta64[D]') # print(dt) return np.argwhere(dt > np.timedelta64(N, 'D'))[:, 0] def splitonprofiles(ds, N = 1): sub_grp = ds.isel(obs=get_splitdates(ds['time'], N=N)) # plt.plot(ds['time'], -ds['z'], 'k.') # plt.plot(sub_grp['time'], -sub_grp['z'], 'r.') return sub_grp
[docs]def simu2index(ds: xr.Dataset, N: int = 1): """Convert a trajectory simulation :class:`xarray.Dataset` to an Argo index of profiles Profiles are identified using the ``cycle_number`` dataset variable. A profile is identified if the last observation of a cycle_number sequence is in cycle_phase 3 or 4. This function remains compatible with older versions of trajectory netcdf files without the ``cycle_number`` variable. In this case, a profile is identified if the last observation of a cycle_phase==3 sequence is separated by N days from the next sequence. Parameters ---------- ds: :class:`xarray.Dataset` The simulation trajectories dataset N: int, optional The minimal time lag between cycle_phase sequences to be identified as a new profile. This will be removed in the future when we'll drop support for old netcdf outputs. Returns ------- df: :class:`pandas.DataFrame` The profiles index """ trajdim = 'trajectory' if version.parse(ds.attrs['parcels_version']) >= version.parse("2.4.0") else 'traj' ds_list = [] if 'cycle_number' in ds.data_vars: for traj in tqdm(ds[trajdim], total=len(ds[trajdim])): for cyc, grp in ds.loc[{trajdim: traj}].groupby(group='cycle_number'): ds_cyc = grp.isel(obs=-1) if ds_cyc['cycle_phase'] in [3, 4]: ds_cyc['traj_id'] = xr.DataArray(np.full_like((1,), fill_value=traj.data)) ds_list.append(ds_cyc) else: warnings.warn("This is an old trajectory file, results not guaranteed !") for traj in tqdm(ds[trajdim], total=len(ds[trajdim])): for iphase, grp in ds.loc[{trajdim: traj}].groupby(group='cycle_phase'): if iphase == 3: sub_grp = splitonprofiles(grp, N=N) sub_grp['cycle_number'] = xr.DataArray(np.arange(1, len(sub_grp['obs']) + 1), dims='obs') sub_grp['traj_id'] = xr.DataArray(np.full_like(sub_grp['obs'], fill_value=traj.data), dims='obs') ds_list.append(sub_grp) if len(ds_list) > 0: ds_profiles = xr.concat(ds_list, dim='obs') if 'wmo' not in ds_profiles.data_vars: ds_profiles['wmo'] = ds_profiles['traj_id'] + 9000000 df = ds_profiles.to_dataframe() df = df.rename({'time': 'date', 'lat': 'latitude', 'lon': 'longitude', 'z': 'min_depth'}, axis='columns') df = df[['date', 'latitude', 'longitude', 'wmo', 'cycle_number', 'traj_id']] df['wmo'] = df['wmo'].astype('int') df['cycle_number'] = df['cycle_number'].astype('int') df['traj_id'] = df['traj_id'].astype('int') df['latitude'] = np.fix(df['latitude'] * 1000).astype('int') / 1000 df['longitude'] = np.fix(df['longitude'] * 1000).astype('int') / 1000 df = df.reset_index(drop=True) return df else: raise ValueError('No virtual floats reaches the final cycling phase, hence no profiles to index')
def simu2index_par(ds): def reducerA(sub_ds): sub_grp = None traj_id = np.unique(sub_ds['trajectory'])[0] for iphase, grp in sub_ds.groupby(group='cycle_phase'): if iphase == 3: sub_grp = splitonprofiles(grp, N=1) sub_grp['cycle_number'] = xr.DataArray(np.arange(1, len(sub_grp['obs']) + 1), dims='obs') sub_grp['traj_id'] = xr.DataArray( np.full_like(sub_grp['obs'], fill_value=traj_id), dims='obs') return sub_grp def reducerB(sub_ds): ds_list = [] traj_id = np.unique(sub_ds['trajectory'])[0] for cyc, grp in sub_ds.groupby(group='cycle_number'): ds_cyc = grp.isel(obs=-1) if ds_cyc['cycle_phase'] in [3, 4]: ds_cyc['traj_id'] = xr.DataArray(np.full_like((1,), fill_value=traj_id), dims='obs') ds_list.append(ds_cyc) try: ds = xr.concat(ds_list, dim='obs') except Exception: log.debug("Error on concat with traj_id: %i" % traj_id) ds = None pass return ds reducer = reducerB if 'cycle_number' in ds.data_vars else reducerA ConcurrentExecutor = concurrent.futures.ThreadPoolExecutor(max_workers=multiprocessing.cpu_count()) with ConcurrentExecutor as executor: future_to_url = {executor.submit(reducer, ds.sel(traj=traj)): ds.sel(traj=traj) for traj in ds['traj']} futures = concurrent.futures.as_completed(future_to_url) futures = tqdm(futures, total=len(ds['traj'])) ds_list, failed = [], [] for future in futures: data = None try: data = future.result() except Exception: failed.append(future_to_url[future]) raise finally: ds_list.append(data) ds_list = [r for r in ds_list if r is not None] # Only keep non-empty results ds_profiles = xr.concat(ds_list, dim='obs') if 'wmo' not in ds_profiles.data_vars: ds_profiles['wmo'] = ds_profiles['traj_id'] + 9000000 df = ds_profiles.to_dataframe() df = df.rename({'time': 'date', 'lat': 'latitude', 'lon': 'longitude', 'z': 'min_depth'}, axis='columns') df = df[['date', 'latitude', 'longitude', 'wmo', 'cycle_number', 'traj_id']] df['wmo'] = df['wmo'].astype('int') df['traj_id'] = df['traj_id'].astype('int') df['latitude'] = np.fix(df['latitude'] * 1000).astype('int') / 1000 df['longitude'] = np.fix(df['longitude'] * 1000).astype('int') / 1000 df = df.reset_index(drop=True) return df
[docs]def simu2csv(simu_file: str, index_file: str=None, df: pd.DataFrame=None): """Save simulation results profile index to file, as Argo index Argo profile index can be loaded with argopy. Parameters ---------- simu_file: str Path to netcdf file of simulation results, to load profiles from index_file: str, optional Path to csv file to write index to. By default, it is set using the ``simu_file`` value. df: :class:`pandas.DataFrame`, optional If provided, will be used as the profile index, otherwise, compute index from ``simu_file`` Returns ------- index_file: str Path to the Argo profile index created """ if index_file is None: file_name, file_extension = os.path.splitext(index_file) index_file = simu_file.replace(file_extension, "_ar_index_prof.txt") txt_header = """# Title : Profile directory file of a VirtualFleet simulation # Description : Profiles from simulation result file: {} # Project : ARGO, EARISE # Format version : 2.0 # Date of update : {} # FTP root number 1 : ftp://ftp.ifremer.fr/ifremer/argo/dac # FTP root number 2 : ftp://usgodae.org/pub/outgoing/argo/dac # GDAC node : - """.format(os.path.abspath(simu_file), pd.to_datetime('now', utc=True).strftime('%Y%m%d%H%M%S')) with open(index_file, 'w') as f: f.write(txt_header) if df is None: log.debug("Computing profile index from simulation file: %s" % simu_file) engine = 'zarr' if '.zarr' in simu_file else 'netcdf4' ds = xr.open_dataset(simu_file, engine=engine) ardf = simu2index(ds) # try: # ardf = simu2index_par(xr.open_dataset(simu_file)) # except ValueError: # ardf = simu2index(xr.open_dataset(simu_file)) else: ardf = df.copy() if len(ardf) > 0: log.debug("Writing profile index file: %s" % index_file) with open(index_file, 'a+') as f: ardf['institution'] = 'VF' # VirtualFleet ardf['profiler_type'] = 999 # Reserved ardf['ocean'] = 'A' # Atlantic ocean area ardf['date_update'] = pd.to_datetime('now', utc=True) ardf['file'] = ardf.apply( lambda row: "vf/%i/profiles/R%i_%0.2d.nc" % (row['wmo'], row['wmo'], row['cycle_number']), axis=1) ardf = ardf[['file', 'date', 'latitude', 'longitude', 'ocean', 'profiler_type', 'institution', 'date_update']] ardf.to_csv(f, index=False, date_format='%Y%m%d%H%M%S') return index_file
[docs]def set_WMO(ds: xr.Dataset, argo_index: pd.DataFrame): """Identify virtual floats with their real WMO This function will try to identify WMO from ``argo_index`` in the ``ds`` trajectories. The Argo index must have at least the ``longitude`` and ``latitude`` variables. It's assumed to be the deployment plan. Real WMO numbers are identified as the closest floats from ``argo_index`` to the initial positions of virtual floats from ``ds``. Parameters ---------- ds: :class:`xarray.Dataset` The simulation trajectories dataset argo_index: :class:`pandas.DataFrame` The deployment plan profiles index Returns ------- ds: :class:`xarray.Dataset` The simulation trajectories dataset with a new variable ``wmo`` """ ds['wmo'] = xr.DataArray(np.full((len(ds['traj']),), 0), dims='traj') wmos = [] x_real = np.round(argo_index['longitude'], 5).values y_real = np.round(argo_index['latitude'], 5).values for i in ds['traj']: this = ds.isel(traj=i).sortby('time') x_virt = np.round(this.sel(obs=0)['lon'], 3).values y_virt = np.round(this.sel(obs=0)['lat'], 3).values ii = np.argmin(np.sqrt(np.power(x_real - x_virt, 2) + np.power(y_real - y_virt, 2))) wmo = np.array((argo_index.iloc[ii]['wmo'],))[0] wmos.append(wmo) ds['wmo'].values = wmos return ds
[docs]def get_float_config(wmo: int, cyc: int = None): """Download float configuration using the Euro-Argo meta-data API Parameters ---------- wmo: int The float WMO number cyc: int, default: None The specific cycle number to retrieve data from. If set to None, all cycles meta-data are fetched. Returns ------- :class:`pandas.DataFrame` A dataframe with relevant float configuration parameters for 1 or more cycle numbers. """ def id_mission(missionCycles, a_cyc): this_mission = None for im, mission in enumerate(missionCycles): cycles = missionCycles[mission] if str(a_cyc) in cycles: this_mission = mission return this_mission # Download float meta-data from EA API: URI = "https://fleetmonitoring.euro-argo.eu/floats/%s" % wmo with urllib.request.urlopen(URI) as url: data = json.load(url) # data['configurations']['cycles'] -> dict # keys (str): comma separated list of cycle numbers # Eg: '14,15,22' # values (list of dict): items of the list are one parameter description and value as a dict. # Eg: {'argoCode': 'CONFIG_AscentSpeedMin_mm/s', 'dimLevel': 7, 'value': 83.0, 'description': None} # data['configurations']['missionCycles'] -> dict # keys (str): configuration ID # Eg: '24' # values (list of str): items of the list are cycle numbers, as str, with this configuration # Eg: ['105', '78', '80', '83', '99', '108', '74', '93'] # Get the list of cycles covered: all_cycles = [] for mission in data['configurations']['missionCycles']: cycles = data['configurations']['missionCycles'][mission] cycles = np.sort([int(cyc) for cyc in cycles]) [all_cycles.append(cyc) for cyc in cycles] all_cycles = np.sort(all_cycles) if cyc is not None: all_cycles = [cyc] # Create a dictionary with 'CONFIG_*' as keys # the CONFIG_MissionID is set manually, then we add all possible config keys by scanning all argoCode CONFIG = {'CONFIG_MissionID': []} for cyles_with_this_config in list(data['configurations']['cycles'].keys()): for item in data['configurations']['cycles'][cyles_with_this_config]: if cyc is not None: if cyc in [int(c) for c in cyles_with_this_config.split(',')]: CONFIG[item['argoCode']] = [] else: CONFIG[item['argoCode']] = [] CONFIG = dict(sorted(CONFIG.items())) # Loop through all cycle numbers # for each cycle we identify the associated entry for icyc, a_cyc in enumerate(all_cycles): mission_id = id_mission(data['configurations']['missionCycles'], a_cyc) for im, cyles_with_this_config in enumerate(data['configurations']['cycles']): if a_cyc in [int(c) for c in cyles_with_this_config.split(',')]: this_config = data['configurations']['cycles'][cyles_with_this_config] for key in CONFIG: if key in [item['argoCode'] for item in this_config]: for item in this_config: if item['argoCode'] == key: if len(CONFIG[item[ 'argoCode']]) == icyc: # issue: https://github.com/euroargodev/VirtualFleet/issues/26 CONFIG[item['argoCode']].append(item['value']) elif key == 'CONFIG_MissionID': CONFIG['CONFIG_MissionID'].append(mission_id) else: CONFIG[key].append('') df = pd.DataFrame(CONFIG, index=all_cycles) df.index.name = 'CYCLE_NUMBER' return df
def strfdelta(tdelta, fmt='{D:02}d {H:02}h {M:02}m {S:02}s', inputtype='timedelta'): """Convert a datetime.timedelta object or a regular number to a custom- formatted string, just like the stftime() method does for datetime.datetime objects. The fmt argument allows custom formatting to be specified. Fields can include seconds, minutes, hours, days, and weeks. Each field is optional. Some examples: '{D:02}d {H:02}h {M:02}m {S:02}s' --> '05d 08h 04m 02s' (default) '{W}w {D}d {H}:{M:02}:{S:02}' --> '4w 5d 8:04:02' '{D:2}d {H:2}:{M:02}:{S:02}' --> ' 5d 8:04:02' '{H}h {S}s' --> '72h 800s' The inputtype argument allows tdelta to be a regular number instead of the default, which is a datetime.timedelta object. Valid inputtype strings: 's', 'seconds', 'm', 'minutes', 'h', 'hours', 'd', 'days', 'w', 'weeks' """ # Convert tdelta to integer seconds. if inputtype == 'timedelta': remainder = int(tdelta.total_seconds()) elif inputtype in ['s', 'seconds']: remainder = int(tdelta) elif inputtype in ['m', 'minutes']: remainder = int(tdelta)*60 elif inputtype in ['h', 'hours']: remainder = int(tdelta)*3600 elif inputtype in ['d', 'days']: remainder = int(tdelta)*86400 elif inputtype in ['w', 'weeks']: remainder = int(tdelta)*604800 f = Formatter() desired_fields = [field_tuple[1] for field_tuple in f.parse(fmt)] possible_fields = ('W', 'D', 'H', 'M', 'S') constants = {'W': 604800, 'D': 86400, 'H': 3600, 'M': 60, 'S': 1} values = {} for field in possible_fields: if field in desired_fields and field in constants: values[field], remainder = divmod(remainder, constants[field]) return f.format(fmt, **values) def getSystemInfo(): """Return system information as a dict""" try: info = {} info['platform']=platform.system() info['platform-release']=platform.release() info['platform-version']=platform.version() info['architecture']=platform.machine() info['hostname']=socket.gethostname() info['ip-address']=socket.gethostbyname(socket.gethostname()) # info['mac-address']=':'.join(re.findall('..', '%012x' % uuid.getnode())) info['processor']=platform.processor() info['ram']=str(round(psutil.virtual_memory().total / (1024.0 **3)))+" GB" return info except Exception as e: logging.exception(e)