Source code for fmu.ensemble.ensembleset

"""Module for book-keeping and aggregation of ensembles"""

import glob
import logging
import os
import re
import warnings

import numpy as np
import pandas as pd

from .ensemble import ScratchEnsemble, VirtualEnsemble

logger = logging.getLogger(__name__)


[docs]class EnsembleSet(object): """An ensemble set is any collection of ensemble objects Ensemble objects are ScratchEnsembles or VirtualEnsembles. There is support for initializing from a filstructure with both iterations and batches, but the concept of iterations and batches are not kept in an EnsembleSet, there each ensemble is uniquely identified by the ensemble name. To keep the iteration (and batch) concept, that must be embedded into the ensemble name. The init method will make an ensemble set, either as empty, or from a list of already initialized ensembles, or directly from the filesystem, or from an ERT runpath file. Only one of these initialization modes can be used. Args: name: Chosen name for the ensemble set. Can be used if aggregated at a higher level. ensembles: list of Ensemble objects. Can be omitted. frompath: string or list of strings with filesystem path. Will be globbed by default. If no realizations or iterations are detected after globbing, the standard glob 'realization-*/iter-*/ will be used. runpathfile: string with path to an ert runpath file which will be used to lookup realizations and iterations. realidxregexp: regular expression object that will be used to determine the realization index (must be integer) from a path component (split by /). The default fits realization-* iterregexp: similar to realidxregexp, and result will always be treated as a string. batchregexp: similar ot iterregexp, for future support of an extra level similar to iterations autodiscovery: boolean, sent to initializing Realization objects, instructing them on whether certain files should be auto-discovered. batch (dict): List of functions (load_*) that should be run at time of initialization for each realization. Each element is a length 1 dictionary with the function name to run as the key and each keys value should be the function arguments as a dict. """ def __init__( self, name=None, ensembles=None, frompath=None, runpathfile=None, realidxregexp=None, iterregexp=None, batchregexp=None, autodiscovery=True, batch=None, ): self._name = name self._ensembles = {} # Dictionary indexed by each ensemble's name. if (ensembles and frompath) or (ensembles and runpathfile): logger.error( ( "EnsembleSet only supports one initialization mode," "from list of ensembles\n, list of paths or " "an ert runpath file" ) ) raise ValueError # Check consistency in arguments. if not name: logger.warning("EnsembleSet name defaulted to 'ensembleset'") name = "ensembleset" self._name = name if name and not isinstance(name, str): logger.error("Name of EnsembleSet must be a string") return if frompath and not isinstance(frompath, str): logger.error("frompath arg given to EnsembleSet must be a string") return if ensembles and not isinstance(ensembles, list): logger.error("Ensembles supplied to EnsembleSet must be a list") return if ensembles and isinstance(ensembles, list): if batch: logger.warning( "Batch commands not procesed when loading finished ensembles" ) for ensemble in ensembles: if isinstance(ensemble, (ScratchEnsemble, VirtualEnsemble)): self._ensembles[ensemble.name] = ensemble else: logger.warning("Supplied object was not an ensemble") if not self._ensembles: logger.warning("No ensembles added to EnsembleSet") if frompath: self.add_ensembles_frompath( frompath, realidxregexp, iterregexp, batchregexp, autodiscovery=autodiscovery, batch=batch, ) if not self._ensembles: logger.warning("No ensembles added to EnsembleSet") if runpathfile: if not os.path.exists(runpathfile): logger.error("Could not open runpath file %s", runpathfile) raise IOError self.add_ensembles_fromrunpath(runpathfile, batch=batch) if not self._ensembles: logger.warning("No ensembles added to EnsembleSet") @property def name(self): """Return the name of the ensembleset, as initialized""" return self._name def __len__(self): return len(self._ensembles) def __getitem__(self, name): return self._ensembles[name] def __repr__(self): return "<EnsembleSet {}, {} ensembles:\n{}>".format( self.name, len(self), self._ensembles ) @property def ensemblenames(self): """ Return a list of named ensembles in this set """ return list(self._ensembles.keys())
[docs] def keys(self): """ Return the union of all keys available in the ensembles. Keys refer to the realization datastore, a dictionary of dataframes or dicts. """ allkeys = set() for ensemble in self._ensembles.values(): allkeys = allkeys.union(ensemble.keys()) return allkeys
[docs] def add_ensembles_frompath( self, paths, realidxregexp=None, iterregexp=None, batchregexp=None, autodiscovery=True, batch=None, ): """Convenience function for adding multiple ensembles. Args: paths: str or list of strings with path to the directory containing the realization-*/iter-* structure realidxregexp: Supply a regexp that can extract the realization index as an *integer* from path components. The expression will be tested on individual path components from right to left. iterregexp: Similar to real_regexp, but is allowed to match strings. batchregexp: Similar to real_regexp, but is allowed to match strings. autodiscovery: boolean, sent to initializing Realization objects, instructing them on whether certain files should be auto-discovered. batch (dict): List of functions (load_*) that should be run at time of initialization for each realization. Each element is a length 1 dictionary with the function name to run as the key and each keys value should be the function arguments as a dict. """ # Try to catch the most common use case and make that easy: if isinstance(paths, str): if ( "realization" not in paths and not realidxregexp and not iterregexp and not batchregexp ): logger.info( "Adding realization-*/iter-* path pattern to case directory" ) paths = paths + "/realization-*/iter-*" paths = [paths] if not realidxregexp: realidxregexp = re.compile(r"realization-(\d+)") if isinstance(realidxregexp, str): realidxregexp = re.compile(realidxregexp) if not iterregexp: # Alternative regexp that extracts iteration # as an integer # iterregexp = re.compile(r'iter-(\d+)') # Default regexp that will add 'iter-' to the # ensemble name iterregexp = re.compile(r"(iter-\d+)") if isinstance(iterregexp, str): iterregexp = re.compile(iterregexp) if not batchregexp: batchregexp = re.compile(r"batch-(\d+)") if isinstance(batchregexp, str): batchregexp = re.compile(batchregexp) # Check that the regexpes actually can return something if realidxregexp.groups != 1: logger.critical("Invalid regular expression for realization") return if iterregexp.groups != 1: logger.critical("Invalid regular expression for iter") return if batchregexp.groups != 1: logger.critical("Invalid regular expression for batch") return globbedpaths = [glob.glob(path) for path in paths] globbedpaths = list({item for sublist in globbedpaths for item in sublist}) # Build a temporary dataframe of globbed paths, and columns with # the realization index and the iter we found # (extented to a third level called 'batch') paths_df = pd.DataFrame(columns=["path", "real", "iter", "batch"]) for path in globbedpaths: real = None iterr = None # 'iter' is a builtin.. batchname = None for path_comp in reversed(path.split(os.path.sep)): realmatch = re.match(realidxregexp, path_comp) if realmatch: real = int(realmatch.group(1)) break for path_comp in reversed(path.split(os.path.sep)): itermatch = re.match(iterregexp, path_comp) if itermatch: iterr = str(itermatch.group(1)) break for path_comp in reversed(path.split(os.path.sep)): batchmatch = re.match(batchregexp, path_comp) if batchmatch: batchname = str(itermatch.group(1)) break df_row = {"path": path, "real": real, "iter": iterr, "batch": batchname} paths_df = pd.concat([paths_df, pd.DataFrame([df_row])], ignore_index=True) paths_df.fillna(value="Unknown", inplace=True) # Initialize ensemble objects for each iter found: iters = sorted(paths_df["iter"].unique()) logger.info("Identified %s iterations, %s", len(iters), iters) for iterr in iters: # The realization indices *must* be unique for these # chosen paths, otherwise we are most likely in # trouble iterslice = paths_df[paths_df["iter"] == iterr] if len(iterslice["real"].unique()) != len(iterslice): logger.error("Repeated realization indices for iter %s", iterr) logger.error("Some realizations will be ignored") pathsforiter = sorted(paths_df[paths_df["iter"] == iterr]["path"].values) # iterr might contain the 'iter-' prefix, # depending on chosen regexpx ens = ScratchEnsemble( str(iterr), pathsforiter, realidxregexp=realidxregexp, autodiscovery=autodiscovery, batch=batch, ) self._ensembles[ens.name] = ens
[docs] def add_ensembles_fromrunpath(self, runpathfile, batch=None): """Add one or many ensembles from an ERT runpath file. autodiscovery is not an argument, it is by default set to False for runpath-files, since the location of the UNSMRY-file is given in the runpath file. """ runpath_df = pd.read_csv( runpathfile, sep=r"\s+", engine="python", names=["index", "runpath", "eclbase", "iter"], ) # If index and iter columns are all integers (typically zero padded), # Pandas has converted them to int64. If not, they will be # strings (objects) for iterr in runpath_df["iter"].unique(): # Make a runpath slice, and initialize from that: ens_runpath = runpath_df[runpath_df["iter"] == iterr] ens = ScratchEnsemble( "iter-" + str(iterr), runpathfile=ens_runpath, autodiscovery=False, batch=batch, ) self._ensembles[ens.name] = ens
[docs] def add_ensemble(self, ensembleobject): """Add a single ensemble to the ensemble set Name is taken from the ensembleobject. """ if ensembleobject.name in self._ensembles: raise ValueError( "The name {} already exists in the EnsembleSet".format( ensembleobject.name ) ) self._ensembles[ensembleobject.name] = ensembleobject
@property def parameters(self): """Build a dataframe of the information in each realizations parameters.txt. If no realizations have the file, an empty dataframe is returned. Returns: pd.DataFrame """ try: return self.get_df("parameters.txt") except KeyError: return pd.DataFrame()
[docs] def load_scalar(self, localpath, convert_numeric=False, force_reread=False): """Parse a single value from a file The value can be a string or a number. Empty files are treated as existing, with an empty string as the value, different from non-existing files. Parsing is performed individually in each ensemble and realization""" for ensname, ensemble in self._ensembles.items(): try: ensemble.load_scalar(localpath, convert_numeric, force_reread) except ValueError: # This will occur if an ensemble is missing the file. # At ensemble level that is an Error, but at EnsembleSet level # it is only a warning. logger.warning( "Ensemble %s did not contain the data %s", ensname, localpath )
[docs] def load_txt(self, localpath, convert_numeric=True, force_reread=False): """Parse and internalize a txt-file from disk Parses text files on the form <key> <value> in each line.""" return self.load_file(localpath, "txt", convert_numeric, force_reread)
[docs] def load_csv(self, localpath, convert_numeric=True, force_reread=False): """Parse and internalize a CSV file from disk""" return self.load_file(localpath, "csv", convert_numeric, force_reread)
[docs] def load_file(self, localpath, fformat, convert_numeric=True, force_reread=False): """Internal function for load_*()""" for ensname, ensemble in self._ensembles.items(): try: ensemble.load_file(localpath, fformat, convert_numeric, force_reread) except (KeyError, ValueError): # This will occur if an ensemble is missing the file. # At ensemble level that is an Error, but at EnsembleSet level # it is only a warning. logger.warning( "Ensemble %s did not contain the data %s", ensname, localpath ) return self.get_df(localpath)
[docs] def get_df(self, localpath, merge=None): """Collect contents of dataframes from each ensemble Args: localpath (str): path to the text file, relative to each realization merge (list or str): refer to additional localpath(s) which will be merged into the dataframe for every ensemble/realization. Merging happens before aggregation. """ ensdflist = [] for _, ensemble in self._ensembles.items(): try: ensdf = ensemble.get_df(localpath, merge=merge) ensdf.insert(0, "ENSEMBLE", ensemble.name) ensdflist.append(ensdf) except (KeyError, ValueError): # Happens if an ensemble is missing some data # Warning has already been issued at initialization pass if ensdflist: return pd.concat(ensdflist, sort=False) raise KeyError("No data found for {} or merge failed".format(localpath))
[docs] def drop(self, localpath, **kwargs): """Delete elements from internalized data. Shortcuts are allowed for localpath. If the data pointed to is a DataFrame, you can delete columns, or rows containing certain elements If the data pointed to is a dictionary, keys can be deleted. Args: localpath: string, path to internalized data. If no other options are supplied, that dataset is deleted in its entirety column: string with a column name to drop. Only for dataframes columns: list of strings with column names to delete rowcontains: rows where one column contains this string will be dropped. The comparison is on strings only, and all cells in the dataframe is converted to strings for the comparison. Thus it might work on dates, but be careful with numbers. key: string with a keyname in a dictionary. Will not work for dataframes keys: list of strings of keys to delete from a dictionary """ if self.shortcut2path(localpath) not in self.keys(): raise ValueError("%s not found" % localpath) for _, ensemble in self._ensembles.items(): try: # noqa: SIM105 ensemble.drop(localpath, **kwargs) except ValueError: pass # Allow localpath to be missing in some ensembles.
[docs] def remove_data(self, localpaths): """Remove certain datatypes from each ensembles/realizations datastores. This modifies the underlying realization objects, and is equivalent to >>> del realization[localpath] on each realization in each ensemble. Args: localpaths (string): Full localpath to the data, or list of strings. """ for _, ensemble in self._ensembles.items(): ensemble.remove_data(localpaths)
[docs] def process_batch(self, batch=None): """Process a list of functions to run/apply This is equivalent to calling each function individually but this enables more efficient concurrency. It is meant to be used for functions that modifies the realization object, not for functions that returns a dataframe already. Args: batch (list): Each list element is a dictionary with one key, being a function names, value pr key is a dict with keyword arguments to be supplied to each function. """ for ensemble in self._ensembles.values(): if isinstance(ensemble, ScratchEnsemble): ensemble.process_batch(batch)
[docs] def apply(self, callback, **kwargs): """Callback functionalty, apply a function to every realization The supplied function handle will be handed over to each underlying ScratchEnsemble object, which in turn will hand it over to its realization objects. The function supplied must return a Pandas DataFrame. The function can obtain the realization object in the kwargs dictionary through the key 'realization'. Any VirtualEnsembles are ignored. Operations on dataframes in VirtualEnsembles can be done using the apply() functionality in pd.DataFrame Args: callback: function handle kwargs: dictionary where 'realization' and 'localpath' is reserved, will be forwarded to the callbacked function localpath: str, optional if the data is to be internalized in each realization object. Returns: pd.DataFrame, aggregated result of the supplied function on each realization. """ results = [] for ens_name, ensemble in self._ensembles.items(): if isinstance(ensemble, ScratchEnsemble): result = ensemble.apply(callback, **kwargs) result["ENSEMBLE"] = ens_name results.append(result) return pd.concat(results, sort=False, ignore_index=True)
[docs] def shortcut2path(self, shortpath): """ Convert short pathnames to fully qualified pathnames within the datastore. If the fully qualified localpath is 'share/results/volumes/simulator_volume_fipnum.csv' then you can also access this with these alternatives: * simulator_volume_fipnum * simulator_volume_fipnum.csv * share/results/volumes/simulator_volume_fipnum but only as long as there is no ambiguity. In case of ambiguity, the shortpath will be returned. CODE DUPLICATION from realization.py """ basenames = [os.path.basename(key) for key in self.keys()] if basenames.count(shortpath) == 1: short2path = {os.path.basename(x): x for x in self.keys()} return short2path[shortpath] noexts = ["".join(x.split(".")[:-1]) for x in self.keys()] if noexts.count(shortpath) == 1: short2path = {"".join(x.split(".")[:-1]): x for x in self.keys()} return short2path[shortpath] basenamenoexts = [ "".join(os.path.basename(x).split(".")[:-1]) for x in self.keys() ] if basenamenoexts.count(shortpath) == 1: short2path = { "".join(os.path.basename(x).split(".")[:-1]): x for x in self.keys() } return short2path[shortpath] # If we get here, we did not find anything that # this shorthand could point to. Return as is, and let the # calling function handle further errors. return shortpath
[docs] def get_csv_deprecated(self, filename): """Load CSV data from each realization in each ensemble, and aggregate. Args: filename: string, filename local to realization Returns: dataframe: Merged CSV from each realization. Realizations with missing data are ignored. Empty dataframe if no data is found """ dflist = [] for _, ensemble in self._ensembles.items(): dframe = ensemble.get_csv(filename) dframe["ENSEMBLE"] = ensemble.name dflist.append(dframe) return pd.concat(dflist, sort=False)
[docs] def load_smry( self, time_index="raw", column_keys=None, cache_eclsum=None, start_date=None, end_date=None, ): """ Fetch summary data from all ensembles Wraps around Ensemble.load_smry() which wraps Realization.load_smry(), which wraps resdata.summary.Summary.pandas_frame() The time index is determined at realization level. If you ask for 'monthly', you will from each realization get its months. At ensemble or ensembleset-level, the number of monthly report dates between realization can vary The pr. realization results will be cached by each realization object, and can be retrieved through get_df(). Args: time_index: list of DateTime if interpolation is wanted default is raw, which returns the raw Eclipse report times If a string is supplied, that string is attempted used via get_smry_dates() in order to obtain a time index. column_keys: list of column key wildcards cache_eclsum: Boolean for whether we should cache the Summary objects. Set to False if you cannot keep all Summary files in memory simultaneously start_date: str or date with first date to include. Dates prior to this date will be dropped, supplied start_date will always be included. Overridden if time_index is 'first' or 'last'. end_date: str or date with last date to be included. Dates past this date will be dropped, supplied end_date will always be included. Overridden if time_index is 'first' or 'last'. Returns: A DataFame of summary vectors for the ensembleset. The column 'ENSEMBLE' will denote each ensemble's name """ if cache_eclsum is not None: warnings.warn( ( "cache_eclsum option to load_smry() is deprecated and " "will be removed in fmu-ensemble v2.0.0" ), FutureWarning, ) # Future: Multithread this: for _, ensemble in self._ensembles.items(): ensemble.load_smry( time_index=time_index, column_keys=column_keys, cache_eclsum=cache_eclsum, start_date=start_date, end_date=end_date, ) if isinstance(time_index, (list, np.ndarray)): time_index = "custom" elif time_index is None: time_index = "raw" return self.get_df("share/results/tables/unsmry--" + time_index + ".csv")
[docs] def get_smry( self, time_index=None, column_keys=None, cache_eclsum=None, start_date=None, end_date=None, ): """Aggregates summary data from all ensembles Wraps around Ensemble.get_smry(), which wraps around Realization.get_smry() which wraps around resdata.summary.Summary.pandas_frame() Args: time_index: list of DateTime if interpolation is wanted default is None, which returns the raw Eclipse report times If a string is supplied, that string is attempted used via get_smry_dates() in order to obtain a time index. column_keys: list of column key wildcards cache_eclsum: boolean for whether to cache the Summary objects. Defaults to False. Set to True if there is enough memory to keep all realizations summary files in memory at once. This will speed up subsequent operations start_date: str or date with first date to include. Dates prior to this date will be dropped, supplied start_date will always be included. Overridden if time_index is 'first' or 'last'. end_date: str or date with last date to be included. Dates past this date will be dropped, supplied end_date will always be included. Overridden if time_index is 'first' or 'last'. Returns: A DataFame of summary vectors for the EnsembleSet. The column ENSEMBLE will distinguish the different ensembles by their respective names. """ if cache_eclsum is not None: warnings.warn( ( "cache_eclsum option to get_smry() is deprecated and " "will be removed in fmu-ensemble v2.0.0" ), FutureWarning, ) smrylist = [] for _, ensemble in self._ensembles.items(): smry = ensemble.get_smry( time_index, column_keys, cache_eclsum, start_date, end_date ) smry.insert(0, "ENSEMBLE", ensemble.name) smrylist.append(smry) if smrylist: return pd.concat(smrylist, sort=False) return pd.DataFrame()
[docs] def get_smry_dates( self, freq="monthly", cache_eclsum=None, start_date=None, end_date=None ): """Return list of datetimes from an ensembleset Datetimes from each realization in each ensemble can be returned raw, or be resampled. Args: freq: string denoting requested frequency for the returned list of datetime. 'report' will yield the sorted union of all valid timesteps for all realizations. Other valid options are 'daily', 'monthly' and 'yearly'. cache_eclsum: Boolean for whether we should cache the Summary objects. Set to False if you cannot keep all Summary files in memory simultaneously start_date: str or date with first date to include. Dates prior to this date will be dropped, supplied start_date will always be included. Overridden if time_index is 'first' or 'last'. end_date: str or date with last date to be included. Dates past this date will be dropped, supplied end_date will always be included. Overridden if time_index is 'first' or 'last'. Returns: list of datetime.date. """ if cache_eclsum is not None: warnings.warn( ( "cache_eclsum option to get_smry_dates() is deprecated and " "will be removed in fmu-ensemble v2.0.0" ), FutureWarning, ) rawdates = set() for _, ensemble in self._ensembles.items(): rawdates = rawdates.union( ensemble.get_smry_dates( freq="report", cache_eclsum=cache_eclsum, start_date=start_date, end_date=end_date, ) ) rawdates = list(rawdates) rawdates.sort() if freq == "report": return rawdates # Later optimization: Wrap eclsum.start_date in the # ensemble object. start_date = min(rawdates) end_date = max(rawdates) pd_freq_mnenomics = {"monthly": "MS", "yearly": "YS", "daily": "D"} if freq not in pd_freq_mnenomics: raise ValueError("Requested frequency %s not supported" % freq) datetimes = pd.date_range(start_date, end_date, freq=pd_freq_mnenomics[freq]) # Convert from Pandas' datetime64 to datetime.date: return [x.date() for x in datetimes]
[docs] def get_wellnames(self, well_match=None): """Return a union of all Eclipse summary well names in all ensembles realizations (union). Optionally, the well names can be filtered. Args: well_match: `Optional`. String (or list of strings) with wildcard filter (globbing). If None, all wells ar returned. Empty string will not match anything. Returns: list of strings with eclipse well names. Empty list if no summary file or no matched well names. """ warnings.warn( "ensembleset.get_wellnames() is deprecated and " "will be removed in later versions.", FutureWarning, ) result = set() for _, ensemble in self._ensembles.items(): result = result.union(ensemble.get_wellnames(well_match)) return sorted(result)