Source code for fmu.ensemble.observations

"""
Observations support and related calculations
"""

import datetime
import logging
import math
import os
from collections import OrderedDict

import dateutil
import pandas as pd
import yaml

from .ensemble import ScratchEnsemble
from .ensemblecombination import EnsembleCombination
from .ensembleset import EnsembleSet
from .realization import ScratchRealization
from .realizationcombination import RealizationCombination
from .virtualensemble import VirtualEnsemble
from .virtualrealization import VirtualRealization

logger = logging.getLogger(__name__)


[docs]class Observations(object): """Represents a set of observations and the ability to compare realizations and ensembles to the observations The primary data structure is a dictionary holding actual observations, this can typically be loaded from a yaml-file Key functionality is to be able to compute mismatch pr observation and presenting the computed data as a Pandas Dataframe. If run on ensembles, every row will be tagged by which realization index the data was computed for. An observation unit is a concept for the observation and points to something we define as a "single" observation. It can be one value for one datatype at a specific date, but in the case of Eclipse summary vector, it can also be a time-series. Mismatches will be computed pr. observation unit. Pay attention to mismatch versus misfit. Here, mismatch is used for individual observation units, while misfit is used as single number for whole realizations. Important: Using time-series as observations is not recommended in assisted history match. Pick individual uncorrelated data points at relevant points in time instead. The type of observations supported must follow the datatypes that the realizations and ensemble objects are able to internalize. """ # Discussion points: # * Should mismatch calculation happen in this function # with ensembles/realizations input or the other way around? # * Should it be possible to represent the observations # themselves in a dataframe, or does the dict suffice? # (each observation unit should be representable as # a dict, and then it is mergeable in Pandas) def __init__(self, observations): """Initialize an observation object with observations from file or from an incoming dictionary structure Observations will be checked for validity, and incorrect observations (wrong format, unsupported etc.) will be removed. Empty observation list is allowed, and will typically end in empty result dataframes Args: observations: dict with observation structure or string with path to a yaml file. """ self.observations = {} if isinstance(observations, str): with open(observations) as yamlfile: self.observations = yaml.full_load(yamlfile) elif isinstance(observations, dict): self.observations = observations else: raise ValueError("Unsupported object for observations") # Remove unsupported observations # Identify and warn about errors in observation syntax (dates etc) self._clean_observations() logger.info("Initialized observation with obstypes %s", str(self.keys())) for obskey in self.keys(): # (fixme: this string does not make sense) logger.info(" %s: ", str(len(self.observations[obskey]))) def __getitem__(self, someobject): """Pick objects from the observations dict""" return self.observations[someobject]
[docs] def mismatch(self, ens_or_real): """Compute the mismatch from the current observation set to the incoming ensemble or realization. In the case of an ensemble, it will calculate individually for every realization, and aggregate the results. Returns: dataframe with REAL (only if ensemble), OBSKEY, DATE, L1, L2. One row for every observation unit. """ # For ensembles, we should in the future be able to loop # over realizations in a multiprocessing fashion if isinstance(ens_or_real, EnsembleCombination): logger.info("Evaluating EnsembleCombination") ens_or_real = ens_or_real.to_virtual() if isinstance(ens_or_real, RealizationCombination): logger.info("Evaluating RealizationCombination") ens_or_real = ens_or_real.to_virtual() if isinstance(ens_or_real, EnsembleSet): mismatches = {} # pylint: disable=protected-access for ensname, ens in ens_or_real._ensembles.items(): logger.info("Calculating mismatch for ensemble %s", ensname) for realidx, real in ens.realizations.items(): logger.info("Calculating mismatch for realization %s", str(realidx)) mismatches[(ensname, realidx)] = self._realization_mismatch(real) mismatches[(ensname, realidx)]["REAL"] = realidx mismatches[(ensname, realidx)]["ENSEMBLE"] = ensname return pd.concat(mismatches, axis=0, ignore_index=True) if isinstance(ens_or_real, ScratchEnsemble): mismatches = {} for realidx, real in ens_or_real.realizations.items(): mismatches[realidx] = self._realization_mismatch(real) mismatches[realidx]["REAL"] = realidx return pd.concat(mismatches, axis=0, ignore_index=True, sort=False) if isinstance(ens_or_real, VirtualEnsemble): logger.info("Calculating mismatch on ensemble %s", ens_or_real.name) mismatches = {} for realidx in ens_or_real.realindices: mismatches[realidx] = self._realization_mismatch( ens_or_real.get_realization(realidx) ) mismatches[realidx]["REAL"] = realidx return pd.concat(mismatches, axis=0, ignore_index=True, sort=False) if isinstance(ens_or_real, (ScratchRealization, VirtualRealization)): return self._realization_mismatch(ens_or_real) if isinstance(ens_or_real, EnsembleSet): raise NotImplementedError raise ValueError("Unsupported object for mismatch calculation")
[docs] def load_smry(self, realization, smryvector, time_index="yearly", smryerror=None): """Add an observation unit from a VirtualRealization or ScratchRealization, being a specific summaryvector, picking values with the specified time resolution. This can be used to compare similarity between realization, by viewing simulated results as "observations". A use case is to rank all realizations in an ensemble for the similarity to a certain mean profile, f.ex. FOPT. The result of the function is a observation unit added to the smry observations, with values at every date. Arguments: realization: ScratchRealization or VirtualRealization containing data for constructing the virtual observation smryvector: string with a name of a specific summary vector to be used time_index: string with timeresolution, typically 'yearly' or 'monthly'. smryerror: float, constant value to be used as the measurement error for every date. """ dataseries = realization.get_smry( column_keys=[smryvector], time_index=time_index )[smryvector] # In the context of this function, datetimes are not supported. Ensure dates: if isinstance(dataseries.index, pd.DatetimeIndex): dataseries.index = dataseries.index.date # If the index is a list of strings (object), # it is ok (assuming ISO-datestrings) # Modify the observation object (self) if "smry" not in self.observations.keys(): self.observations["smry"] = [] # Empty list # Construct a virtual observation with observation units # at every timestep: virtobs = {} virtobs["key"] = smryvector virtobs["comment"] = "Virtual observation unit constructed from " + str( realization ) virtobs["observations"] = [] for date, value in dataseries.items(): virtobs["observations"].append( {"value": value, "error": smryerror, "date": date} ) self.observations["smry"].append(virtobs)
def __len__(self): """Return the number of observation units present""" # This is not correctly implemented yet.. return len(self.observations.keys()) @property def empty(self): """Decide if the observation set is empty An empty observation set is has zero observation unit count""" return not self.__len__()
[docs] def keys(self): """Return a list of observation units present. This list might change into a dataframe in the future, but calling len() on its results should always return the number of observation units.""" return self.observations.keys()
def _realization_mismatch(self, real): """Compute the mismatch from the current loaded observations to a realization. Supports both ScratchRealizations and VirtualRealizations The returned dataframe contains the columns: * OBSTYPE - category/type of the observation * OBSKEY - name of the observation key * LABEL - if an observation has it * DATE - only where relevant. * OBSINDEX - where an enumeration is relevant * MISMATCH - signed difference between value and result * L1 - absolute difference * L2 - absolute difference squared * SIGN - True if positive difference * SIMVALUE - the simulated value, not for smryh * OBSVALUE - the observed value, not for smryh One row for every observation unit. Args: real : ScratchRealization or VirtualRealization Returns: dataframe: One row per observation unit with mismatch data """ # mismatch_df = pd.DataFrame(columns=['OBSTYPE', 'OBSKEY', # 'DATE', 'OBSINDEX', 'MISMATCH', 'L1', 'L2', 'SIGN']) mismatches = [] for obstype in self.observations.keys(): for obsunit in self.observations[obstype]: # (list) if obstype == "txt": try: sim_value = real.get_df(obsunit["localpath"])[obsunit["key"]] except (KeyError, ValueError): logger.warning( "%s in %s not found, ignored", obsunit["key"], obsunit["localpath"], ) continue mismatch = float(sim_value - obsunit["value"]) measerror = 1 sign = (mismatch > 0) - (mismatch < 0) mismatches.append( { "OBSTYPE": obstype, "OBSKEY": str(obsunit["localpath"]) + "/" + str(obsunit["key"]), "LABEL": obsunit.get("label", ""), "MISMATCH": mismatch, "L1": abs(mismatch), "L2": abs(mismatch) ** 2, "SIMVALUE": sim_value, "OBSVALUE": obsunit["value"], "MEASERROR": measerror, "SIGN": sign, } ) if obstype == "scalar": try: sim_value = real.get_df(obsunit["key"]) except (KeyError, ValueError): logger.warning( "No data found for scalar: %s, ignored", obsunit["key"] ) continue mismatch = float(sim_value - obsunit["value"]) measerror = 1 sign = (mismatch > 0) - (mismatch < 0) mismatches.append( { "OBSTYPE": obstype, "OBSKEY": str(obsunit["key"]), "LABEL": obsunit.get("label", ""), "MISMATCH": mismatch, "L1": abs(mismatch), "SIMVALUE": sim_value, "OBSVALUE": obsunit["value"], "MEASERROR": measerror, "L2": abs(mismatch) ** 2, "SIGN": sign, } ) if obstype == "smryh": if "time_index" in obsunit: if isinstance(obsunit["time_index"], str): sim_hist = real.get_smry( time_index=obsunit["time_index"], column_keys=[obsunit["key"], obsunit["histvec"]], ) elif isinstance( obsunit["time_index"], (datetime.datetime, datetime.date) ): # real.get_smry only allows strings or # list of datetimes as time_index. sim_hist = real.get_smry( time_index=[obsunit["time_index"]], column_keys=[obsunit["key"], obsunit["histvec"]], ) else: logger.error( ( "obsunit-timeindex was not string or date object\n" "Should not be possible, file a bug report" ) ) logger.error(obsunit["time_index"]) logger.error(type(obsunit["time_index"])) time_index_str = str(obsunit["time_index"]) else: sim_hist = real.get_smry( column_keys=[obsunit["key"], obsunit["histvec"]] # (let get_smry() determine the possible time_index) ) time_index_str = "" # If empty df returned, we don't have the data for this: if sim_hist.empty: logger.warning( "No data found for smryh: %s and %s, ignored.", obsunit["key"], obsunit["histvec"], ) continue sim_hist["mismatch"] = ( sim_hist[obsunit["key"]] - sim_hist[obsunit["histvec"]] ) measerror = 1 mismatches.append( { "OBSTYPE": "smryh", "OBSKEY": obsunit["key"], "LABEL": obsunit.get("label", ""), "MISMATCH": sim_hist["mismatch"].sum(), "MEASERROR": measerror, "L1": sim_hist["mismatch"].abs().sum(), "L2": math.sqrt((sim_hist["mismatch"] ** 2).sum()), "TIME_INDEX": time_index_str, } ) if obstype == "smry": # For 'smry', there is a list of # observations (indexed by date) for unit in obsunit["observations"]: try: sim_value = real.get_smry( time_index=[unit["date"]], column_keys=obsunit["key"] )[obsunit["key"]].values[0] except KeyError: logger.warning( "No data found for smry: %s at %s, ignored.", obsunit["key"], str(unit["date"]), ) continue mismatch = float(sim_value - unit["value"]) sign = (mismatch > 0) - (mismatch < 0) mismatches.append( { "OBSTYPE": "smry", "OBSKEY": obsunit["key"], "DATE": unit["date"], "MEASERROR": unit["error"], "LABEL": unit.get("label", ""), "MISMATCH": mismatch, "OBSVALUE": unit["value"], "SIMVALUE": sim_value, "L1": abs(mismatch), "L2": abs(mismatch) ** 2, "SIGN": sign, } ) return pd.DataFrame(mismatches) def _realization_misfit(self, real, defaulterrors=False, corr=None): """The misfit value for the observation set Ref: https://wiki.statoil.no/wiki/index.php/RP_HM/Observations#Misfit_function Args: real : a ScratchRealization or a VirtualRealization defaulterrors: (boolean) If set to True, zero measurement errors will be set to 1. corr : correlation or weigthing matrix (numpy matrix). If a list or numpy vector is supplied, it is interpreted as a diagonal matrix. If omitted, the identity matrix is used Returns: float : the misfit value for the observation set and realization """ # noqa if corr: raise NotImplementedError( "correlations in misfit " + "calculation is not supported" ) mismatch = self._realization_mismatch(real) zeroerrors = mismatch["MEASERROR"] < 1e-7 if defaulterrors: mismatch[zeroerrors]["MEASERROR"] = 1 elif zeroerrors.any(): print(mismatch[zeroerrors]) raise ValueError( "Zero measurement error in observation set" + ". can't be used to calculate misfit" ) if "MISFIT" not in mismatch.columns: mismatch["MISFIT"] = mismatch["L2"] / (mismatch["MEASERROR"] ** 2) return mismatch["MISFIT"].sum() def _clean_observations(self): """Verify integrity of observations, remove observation units that cannot be used. Will log warnings about things that are removed. Returns number of usable observation units. Ensure that dates are parsed into datetime.date objects. """ supported_categories = ["smry", "smryh", "txt", "scalar", "rft"] # Check top level keys in observations dict: for key in list(self.observations): if key not in supported_categories: self.observations.pop(key) logger.error("Observation category %s not supported", key) continue if not isinstance(self.observations[key], list): logger.error( "Observation category %s did not contain a list, but %s", key, type(self.observations[key]), ) self.observations.pop(key) # Check smryh observations for validity if "smryh" in self.observations.keys(): smryhunits = self.observations["smryh"] if not isinstance(smryhunits, list): logger.warning( "smryh must consist of a list, deleting: %s", str(smryhunits) ) del self.observations["smryh"] for unit in smryhunits: if not isinstance(unit, (dict, OrderedDict)): logger.warning("smryh-units must be dicts, deleting: %s", str(unit)) del smryhunits[smryhunits.index(unit)] continue if not ("key" in unit and "histvec" in unit): logger.warning( ( "smryh units must contain both 'key' and " "'histvec', deleting: %s", str(unit), ) ) del smryhunits[smryhunits.index(unit)] continue # If time_index is not a supported mnemonic, # parse it to a date object if ( "time_index" in unit and unit["time_index"] not in { "raw", "report", "yearly", "daily", "first", "last", "monthly", } and not isinstance(unit["time_index"], datetime.datetime) ): try: unit["time_index"] = dateutil.parser.isoparse( unit["time_index"] ).date() except (TypeError, ValueError) as exception: logger.warning( "Parsing date %s failed with error", (str(unit["time_index"]), str(exception)), ) del smryhunits[smryhunits.index(unit)] continue # If everything has been deleted through cleanup, delete the section if not smryhunits: del self.observations["smryh"] # Check smry observations for validity if "smry" in self.observations.keys(): # We already know that observations['smry'] is a list # Each list element must be a dict with # the mandatory keys 'key' and 'observation' smryunits = self.observations["smry"] for unit in smryunits: if not isinstance(unit, (dict, OrderedDict)): logger.warning( "Observation units must be dicts, deleting: %s", str(unit) ) del smryunits[smryunits.index(unit)] continue if not ("key" in unit and "observations" in unit): logger.warning( ( "Observation unit must contain key and", "observations, deleting: %s", ), str(unit), ) del smryunits[smryunits.index(unit)] continue # Check if strings need to be parsed as dates: for observation in unit["observations"]: if isinstance(observation["date"], str): observation["date"] = dateutil.parser.isoparse( observation["date"] ).date() if not isinstance(observation["date"], datetime.date): logger.error("Date not understood %s", str(observation["date"])) continue # If everything is deleted from 'smry', delete it if not smryunits: del self.observations["smry"]
[docs] def to_ert2observations(self): """Convert the observation set to an observation file for use with Ert 2.x. Returns: multiline string """ raise NotImplementedError
def __repr__(self): """Return a representation of the object The representation is a YAML string that can be used to reinstatiate the object """ return self.to_yaml()
[docs] def to_yaml(self): """Convert the current observations to YAML format Returns: string : Multiline YAML string. """ return yaml.dump(self.observations)
[docs] def to_disk(self, filename): """Write the current observation object to disk In YAML-format. If a new observation object is instantiated from the outputted filename, it should yield identical results in mismatch calculation. Directory structure will be created if not existing. Existing file will be overwritten. Arguments: filename - string with path and filename to be written to""" if not isinstance(filename, str): raise ValueError("Filename must be a string") dirname = os.path.dirname(filename) if not os.path.exists(dirname) and dirname: os.makedirs(dirname) with open(filename, "w") as fhandle: fhandle.write(self.to_yaml())