"""Module containing the ScratchEnsemble class"""
import glob
import logging
import os
import re
import warnings
import dateutil
import numpy as np
import pandas as pd
import yaml
from resdata import ResDataType
from resdata.resfile import ResdataKW
from .ensemblecombination import EnsembleCombination
from .etc import Interaction # noqa
from .realization import ScratchRealization, parse_number
from .util import shortcut2path
from .util.dates import unionize_smry_dates
from .virtualensemble import VirtualEnsemble
from .virtualrealization import VirtualRealization
logger = logging.getLogger(__name__)
[docs]class ScratchEnsemble(object):
"""An ensemble is a collection of Realizations.
Ensembles are initialized from path(s) pointing to
filesystem locations containing realizations.
Ensemble objects can be grouped into EnsembleSet.
Realizations in an ensembles are uniquely determined
by their realization index (integer).
Example for initialization:
>>> from fmu import ensemble
>>> ens = ensemble.ScratchEnsemble('ensemblename',
'/scratch/fmu/foobert/r089/casename/realization-*/iter-0')
Upon initialization, only a subset of the files on
disk will be discovered. More files must be expliclitly
discovered and/or loaded.
Args:
ensemble_name (str): Name identifier for the ensemble.
Optional to have it consistent with f.ex. iter-0 in the path.
paths (list/str): String or list of strings with wildcards
to file system. Absolute or relative paths.
If omitted, ensemble will be empty unless runpathfile
is used.
realidxregexp (str or regexp): used to deduce the realization index
from the file path. Default tailored for realization-X
runpathfile (str): Filename (absolute or relative) of an ERT
runpath file, consisting of four space separated text fields,
first column is realization index, second column is absolute
or relative path to a realization RUNPATH, third column is
the basename of the Eclipse simulation, relative to RUNPATH.
Fourth column is not used.
runpathfilter (str): If supplied, the only the runpaths in
the runpathfile which contains this string will be included
Use to select only a specific realization f.ex.
autodiscovery (boolean): True by default, means that the class
can try to autodiscover data in the realization. Turn
off to gain more fined tuned control.
manifest: dict or filename to use for manifest. If filename,
it must be a yaml-file that will be parsed to a single dict.
batch (dict): List of functions (load_*) that
should be run at time of initialization for each realization.
Each element is a length 1 dictionary with the function name to run as
the key and each keys value should be the function arguments as a dict.
"""
def __init__(
self,
ensemble_name,
paths=None,
realidxregexp=None,
runpathfile=None,
runpathfilter=None,
autodiscovery=True,
manifest=None,
batch=None,
):
self._name = ensemble_name # ensemble name
self.realizations = {} # dict of ScratchRealization objects,
# indexed by realization indices as integers.
self._ens_df = pd.DataFrame()
self._manifest = {}
self._global_active = None
self._global_size = None
self._global_grid = None
self.obs = None
if isinstance(paths, str):
paths = [paths]
if paths and runpathfile:
logger.error("Cannot initialize from both path and runpathfile")
return
globbedpaths = None
if isinstance(paths, list):
# Glob incoming paths to determine
# paths for each realization (flatten and uniqify)
globbedpaths = [glob.glob(path) for path in paths]
globbedpaths = list({item for sublist in globbedpaths for item in sublist})
if not globbedpaths:
if isinstance(runpathfile, str) and not runpathfile:
logger.warning("Initialized empty ScratchEnsemble")
return
if isinstance(runpathfile, pd.DataFrame) and runpathfile.empty:
logger.warning("Initialized empty ScratchEnsemble")
return
count = None
if globbedpaths:
logger.info("Loading ensemble from dirs: %s", " ".join(globbedpaths))
# Search and locate minimal set of files
# representing the realizations.
count = self.add_realizations(
paths, realidxregexp, autodiscovery=autodiscovery, batch=batch
)
if isinstance(runpathfile, str) and runpathfile:
count = self.add_from_runpathfile(runpathfile, runpathfilter, batch=batch)
if isinstance(runpathfile, pd.DataFrame) and not runpathfile.empty:
count = self.add_from_runpathfile(runpathfile, runpathfilter, batch=batch)
if manifest:
# The _manifest variable is set using a property decorator
self.manifest = manifest
if count:
logger.info("ScratchEnsemble initialized with %d realizations", count)
else:
logger.warning("ScratchEnsemble empty")
def __getitem__(self, realizationindex):
"""Get one of the ScratchRealization objects.
Indexed by integers."""
return self.realizations[realizationindex]
[docs] def keys(self):
"""
Return the union of all keys available in realizations.
Keys refer to the realization datastore of internalized
data. The datastore is a dictionary
of dataframes or dicts. Examples would be `parameters.txt`,
`STATUS`, `share/results/tables/unsmry--monthly.csv`
"""
allkeys = set()
for realization in self.realizations.values():
allkeys = allkeys.union(realization.keys())
return list(allkeys)
[docs] def add_realizations(
self, paths, realidxregexp=None, autodiscovery=True, batch=None
):
"""Utility function to add realizations to the ensemble.
Realizations are identified by their integer index.
If the realization index already exists, it will be replaced
when calling this function.
This function passes on initialization to ScratchRealization
and stores a reference to those generated objects.
Args:
paths (list/str): String or list of strings with wildcards
to file system. Absolute or relative paths.
autodiscovery (boolean): whether files can be attempted
auto-discovered
batch (list): Batch commands sent to each realization.
Returns:
count (int): Number of realizations successfully added.
"""
if isinstance(paths, list):
globbedpaths = [glob.glob(path) for path in paths]
# Flatten list and uniquify:
globbedpaths = list({item for sublist in globbedpaths for item in sublist})
else:
globbedpaths = glob.glob(paths)
count = 0
for realdir in globbedpaths:
realization = ScratchRealization(
realdir,
realidxregexp=realidxregexp,
autodiscovery=autodiscovery,
batch=batch,
)
if realization.index is None:
logger.critical(
"Could not determine realization index for path %s", realdir
)
if not realidxregexp:
logger.critical("Maybe you need to supply a regexp.")
else:
logger.critical("Your regular expression is maybe wrong.")
else:
count += 1
self.realizations[realization.index] = realization
logger.info("add_realizations() found %d realizations", len(self.realizations))
return count
[docs] def add_from_runpathfile(self, runpath, runpathfilter=None, batch=None):
"""Add realizations from a runpath file typically
coming from ERT.
The runpath file is a space separated table with the columns:
* index - integer with realization index
* runpath - string with the full path to the realization
* eclbase - ECLBASE within the runpath (location of DATA file
minus the trailing '.DATA')
* iter - integer with the iteration number.
Args:
runpath (str): Filename, absolute or relative, or
a Pandas DataFrame parsed from a runpath file
runpathfilter (str). A filter which each filepath has to match
in order to be included. Default None which means not filter
batch (list): Batch commands to be sent to each realization.
Returns:
int: Number of successfully added realizations.
"""
prelength = len(self)
if isinstance(runpath, str):
runpath_df = pd.read_csv(
runpath,
sep=r"\s+",
engine="python",
names=["index", "runpath", "eclbase", "iter"],
)
elif isinstance(runpath, pd.DataFrame):
# We got a readymade dataframe. Perhaps a slice.
# Most likely we are getting the slice from an EnsembleSet
# initialization.
runpath_df = runpath
if (
"index" not in runpath_df
or "runpath" not in runpath_df
or "eclbase" not in runpath_df
or "iter" not in runpath_df
):
raise ValueError("runpath dataframe not correct")
for _, row in runpath_df.iterrows():
if runpathfilter and runpathfilter not in row["runpath"]:
continue
logger.info("Adding realization from %s", row["runpath"])
realization = ScratchRealization(
row["runpath"],
index=int(row["index"]),
autodiscovery=False,
batch=batch,
)
# Use the ECLBASE from the runpath file to
# ensure we recognize the correct UNSMRY file
realization.find_files(row["eclbase"] + ".DATA")
realization.find_files(row["eclbase"] + ".UNSMRY")
self.realizations[int(row["index"])] = realization
return len(self) - prelength
[docs] def remove_data(self, localpaths):
"""Remove certain datatypes from each realizations
datastores. This modifies the underlying realization
objects, and is equivalent to
>>> del realization[localpath]
on each realization in the ensemble.
Args:
localpaths (string): Full localpaths to
the data, or list of strings.
"""
if isinstance(localpaths, str):
localpaths = [localpaths]
for localpath in localpaths:
for _, real in self.realizations.items():
del real[localpath]
[docs] def remove_realizations(self, realindices):
"""Remove specific realizations from the ensemble
Args:
realindices (int or list of ints): The realization
indices to be removed
"""
if isinstance(realindices, int):
realindices = [realindices]
popped = 0
for index in realindices:
self.realizations.pop(index, None)
popped += 1
logger.info("removed %d realization(s)", popped)
[docs] def to_virtual(self, name=None):
"""Convert the ScratchEnsemble to a VirtualEnsemble.
This means that all imported data in each realization is
aggregated and stored as dataframes in the returned
VirtualEnsemble
Unless specified, the VirtualEnsemble object wil
have the same 'name' as the ScratchEnsemble.
Args:
name (str): Name of the ensemble as virtualized.
"""
if not name:
name = self._name
logger.info("Creating virtual ensemble named %s", str(name))
vens = VirtualEnsemble(name=name, manifest=self.manifest)
for key in self.keys():
vens.append(key, self.get_df(key))
vens.update_realindices()
# __files is the magic name for the dataframe of
# loaded files.
vens.append("__files", self.files)
# Conserve metadata for smry vectors. Build metadata dict for all
# loaded summary vectors.
smrycolumns = [
vens.get_df(key).columns for key in self.keys() if "unsmry" in key
]
smrycolumns = {smrykey for sublist in smrycolumns for smrykey in sublist}
# flatten
meta = self.get_smry_meta(smrycolumns)
if meta:
meta_df = pd.DataFrame.from_dict(meta, orient="index")
meta_df.index.name = "SMRYCOLUMN"
vens.append("__smry_metadata", meta_df.reset_index())
# The metadata dictionary is stored as a Dataframe, with one row pr
# summary key (the index is reset due to code simplifications
# in to/from_disk)
return vens
[docs] def to_disk(self, filesystempath, delete=False, dumpcsv=True, dumpparquet=True):
"""Dump ensemble data to a directory on disk.
The ScratchEnsemble is first converted to a VirtualEnsemble,
which is then dumped to disk. This function is a
convenience wrapper for to_disk() in VirtualEnsemble.
"""
self.to_virtual().to_disk(filesystempath, delete, dumpcsv, dumpparquet)
@property
def manifest(self):
"""Get the manifest of the ensemble. The manifest is
nothing but a Python dictionary with unspecified content
Returns:
dict
"""
return self._manifest
@manifest.setter
def manifest(self, manifest):
"""Set the manifest of the ensemble. The manifest
is nothing but a Python dictionary with unspecified
content
Args:
manifest: dict or str. If dict, it is used as is, if str it
is assumed to be a filename with YAML syntax which is
parsed into a dict and stored as dict
"""
if isinstance(manifest, dict):
if not manifest:
logger.warning("Empty manifest")
self._manifest = {}
else:
self._manifest = manifest
elif isinstance(manifest, str):
if os.path.exists(manifest):
with open(manifest) as file_handle:
manifest_fromyaml = yaml.safe_load(file_handle)
if not manifest_fromyaml:
logger.warning("Empty manifest")
self._manifest = {}
else:
self._manifest = manifest_fromyaml
else:
logger.error("Manifest file %s not found", manifest)
else:
# NoneType will also end here.
logger.error("Wrong manifest type supplied")
@property
def parameters(self):
"""Build a dataframe of the information in each
realizations parameters.txt.
If no realizations have the file, an empty dataframe is returned.
Returns:
pd.DataFrame
"""
try:
return self.load_txt("parameters.txt")
except KeyError:
return pd.DataFrame()
[docs] def load_scalar(self, localpath, convert_numeric=False, force_reread=False):
"""Parse a single value from a file for each realization.
The value can be a string or a number.
Empty files are treated as existing, with an empty string as
the value, different from non-existing files.
Parsing is performed individually in each realization
Args:
localpath (str): path to the text file, relative to each realization
convert_numeric (boolean): If set to True, assume that
the value is numerical, and treat strings as
errors.
force_reread (boolean): Force reread from file system. If
False, repeated calls to this function will
returned cached results.
Returns:
pd.DataFrame: Aggregated data over the ensemble. The column 'REAL'
signifies the realization indices, and a column with the same
name as the localpath filename contains the data.
"""
return self.load_file(localpath, "scalar", convert_numeric, force_reread)
[docs] def load_txt(self, localpath, convert_numeric=True, force_reread=False):
"""Parse a key-value text file from disk and internalize data
Parses text files on the form
<key> <value>
in each line.
Parsing is performed individually in each realization
"""
return self.load_file(localpath, "txt", convert_numeric, force_reread)
[docs] def load_csv(self, localpath, convert_numeric=True, force_reread=False):
"""For each realization, load a CSV.
The CSV file must be present in at least one realization.
The parsing is done individually for each realization, and
aggregation is on demand (through `get_df()`) and when
this function returns.
Args:
localpath (str): path to the text file, relative to each realization
convert_numeric (boolean): If set to True, numerical columns
will be searched for and have their dtype set
to integers or floats. If scalars, only numerical
data will be loaded.
force_reread (boolean): Force reread from file system. If
False, repeated calls to this function will
returned cached results.
Returns:
pd.Dataframe: aggregation of the loaded CSV files. Column 'REAL'
distuinguishes each realizations data.
"""
return self.load_file(localpath, "csv", convert_numeric, force_reread)
[docs] def load_file(self, localpath, fformat, convert_numeric=False, force_reread=False):
"""Function for calling load_file() in every realization
This function may utilize multithreading.
Args:
localpath (str): path to the text file, relative to each realization
fformat (str): string identifying the file format. Supports 'txt'
and 'csv'.
convert_numeric (boolean): If set to True, numerical columns
will be searched for and have their dtype set
to integers or floats. If scalars, only numerical
data will be loaded.
force_reread (boolean): Force reread from file system. If
False, repeated calls to this function will
returned cached results.
Returns:
pd.Dataframe: with loaded data aggregated. Column 'REAL'
distuinguishes each realizations data.
"""
for index, realization in self.realizations.items():
try:
realization.load_file(localpath, fformat, convert_numeric, force_reread)
except ValueError as exc:
# This would at least occur for unsupported fileformat,
# and that we should not skip.
logger.critical("load_file() failed in realization %d", index)
raise ValueError from exc
except IOError:
# At ensemble level, we allow files to be missing in
# some realizations
logger.warning("Could not read %s for realization %d", localpath, index)
if self.get_df(localpath).empty:
raise ValueError("No ensemble data found for {}".format(localpath))
return self.get_df(localpath)
[docs] def find_files(self, paths, metadata=None, metayaml=False):
"""Discover realization files. The files dataframes
for each realization will be updated.
Certain functionality requires up-front file discovery,
e.g. ensemble archiving and ensemble arithmetic.
CSV files for single use do not have to be discovered.
Files containing double-dashes '--' indicate that the double
dashes separate different component with meaning in the
filename. The components are extracted and put into
additional columns "COMP1", "COMP2", etc..
Filetype extension (after the last dot) will be removed
from the last component.
Args:
paths (str or list of str): Filenames (will be globbed)
that are relative to the realization directory.
metadata (dict): metadata to assign for the discovered
files. The keys will be columns, and its values will be
assigned as column values for the discovered files.
metayaml (boolean): Additional possibility of adding metadata from
associated yaml files. Yaml files to be associated to
a specific discovered file can have an optional dot in
front, and must end in .yml, added to the discovered filename.
The yaml file will be loaded as a dict, and have its keys
flattened using the separator '--'. Flattened keys are
then used as column headers in the returned dataframe.
Returns:
pd.DataFrame: with the slice of discovered files in each
realization, tagged with realization index in the column REAL.
Empty dataframe if no files found.
"""
df_list = {}
for index, realization in self.realizations.items():
df_list[index] = realization.find_files(
paths, metadata=metadata, metayaml=metayaml
)
if df_list:
return (
pd.concat(df_list, sort=False)
.reset_index()
.rename(columns={"level_0": "REAL"})
.drop("level_1", axis="columns")
)
return pd.DataFrame()
def __repr__(self):
return "<ScratchEnsemble {}, {} realizations>".format(self.name, len(self))
def __len__(self):
return len(self.realizations)
[docs] def get_smrykeys(self, vector_match=None):
"""
Return a union of all Eclipse Summary vector names
in all realizations (union).
If any requested key/pattern does not match anything, it is
silently ignored.
Args:
vector_match (str or list of str): Wildcards for vectors
to obtain. If None, all vectors are returned
Returns:
list of str: Matched summary vectors. Empty list if no
summary file or no matched summary file vectors
"""
if isinstance(vector_match, str):
vector_match = [vector_match]
result = set()
for index, realization in self.realizations.items():
eclsum = realization.get_eclsum()
if eclsum:
if vector_match is None:
result = result.union(set(eclsum.keys()))
else:
for vector in vector_match:
result = result.union(set(eclsum.keys(vector)))
else:
logger.warning("No EclSum available for realization %d", index)
return list(result)
[docs] def get_df(self, localpath, merge=None):
"""Load data from each realization and aggregate (vertically)
Data must be already have been internalized using
a load_*() function.
Each row is tagged by the realization index in the column 'REAL'
The localpath argument can be shortened, as it will be
looked up using the function shortcut2path()
Args:
localpath (str): refers to the internalized name.
merge (list or str): refer to additional localpath which
will be merged into the dataframe for every realization
Returns:
pd.dataframe: Merged data from each realization.
Realizations with missing data are ignored.
Raises:
KeyError if no data is found in no realizations.
"""
dflist = {}
for index, realization in self.realizations.items():
try:
data = realization.get_df(localpath, merge=merge)
if isinstance(data, dict):
data = pd.DataFrame(index=[1], data=data)
elif isinstance(data, (str, int, float, np.number)):
data = pd.DataFrame(index=[1], columns=[localpath], data=data)
if isinstance(data, pd.DataFrame):
dflist[index] = data
else:
raise ValueError("Unkown datatype returned " + "from realization")
except (KeyError, ValueError):
# No logging here, those error messages
# should have appeared at construction using load_*()
pass
if dflist:
# Merge a dictionary of dataframes. The dict key is
# the realization index, and end up in a MultiIndex
dframe = pd.concat(dflist, sort=False).reset_index()
dframe.rename(columns={"level_0": "REAL"}, inplace=True)
del dframe["level_1"] # This is the indices from each real
return dframe
raise KeyError("No data found for " + localpath)
[docs] def load_smry(
self,
time_index="raw",
column_keys=None,
stacked=None,
cache_eclsum=None,
start_date=None,
end_date=None,
include_restart=True,
):
"""
Fetch and internalize summary data from all realizations.
The fetched summary data will be cached/internalized by each
realization object, and can be retrieved through get_df().
The name of the internalized dataframe is "unsmry--" + a string
for the time index, 'monthly', 'yearly', 'daily' or 'raw'.
Multiple calls to this function with differnent time indices
will lead to multiple storage of internalized dataframes, so
your ensemble can both contain a yearly and a monthly dataset.
There is no requirement for the column_keys to be consistent, but
care should be taken if they differ.
If you create a virtual ensemble of this ensemble object, all
internalized summary data will be kept, as opposed to if
you have retrieved it through get_smry()
Wraps around Realization.load_smry() which wraps around
resdata.summary.Summary.pandas_frame()
Beware that the default time_index for ensembles is 'monthly',
differing from realizations which use raw dates by default.
Args:
time_index (str or list of DateTime):
If defaulted, the raw Eclipse report times will be used.
If a string is supplied, that string is attempted used
via get_smry_dates() in order to obtain a time index,
typically 'monthly', 'daily' or 'yearly'.
column_keys (str or list of str): column key wildcards. Default is '*'
which will match all vectors in the Eclipse output.
stacked (boolean): determining the dataframe layout. If
true, the realization index is a column, and dates are repeated
for each realization in the DATES column.
If false, a dictionary of dataframes is returned, indexed
by vector name, and with realization index as columns.
This only works when time_index is the same for all
realizations. Not implemented yet!
cache_eclsum (boolean): Boolean for whether we should cache the EclSum
objects. Set to False if you cannot keep all EclSum files in
memory simultaneously
start_date (str or date): First date to include.
Dates prior to this date will be dropped, supplied
start_date will always be included. Overridden if time_index
is 'first' or 'last'. If string, use ISO-format, YYYY-MM-DD.
ISO-format, YYYY-MM-DD.
end_date (str or date): Last date to be included.
Dates past this date will be dropped, supplied
end_date will always be included. Overridden if time_index
is 'first' or 'last'. If string, use ISO-format, YYYY-MM-DD.
include_restart (boolean): boolean sent to resdata for whether restart
files should be traversed.
Returns:
pd.DataFame: Summary vectors for the ensemble, or
a dict of dataframes if stacked=False.
"""
if stacked is not None:
warnings.warn(
(
"stacked option to load_smry() is deprecated and "
"will be removed in fmu-ensemble v2.0.0"
),
FutureWarning,
)
else:
stacked = True
if not stacked:
raise NotImplementedError
if cache_eclsum is not None:
warnings.warn(
(
"cache_eclsum option to load_smry() is deprecated and "
"will be removed in fmu-ensemble v2.0.0"
),
FutureWarning,
)
# Future: Multithread this!
for realidx, realization in self.realizations.items():
# We do not store the returned DataFrames here,
# instead we look them up afterwards using get_df()
# Downside is that we have to compute the name of the
# cached object as it is not returned.
logger.info("Loading smry from realization %s", realidx)
realization.load_smry(
time_index=time_index,
column_keys=column_keys,
cache_eclsum=cache_eclsum,
start_date=start_date,
end_date=end_date,
include_restart=include_restart,
)
if isinstance(time_index, (list, np.ndarray)):
time_index = "custom"
elif time_index is None:
time_index = "raw"
return self.get_df("share/results/tables/unsmry--" + time_index + ".csv")
[docs] def get_volumetric_rates(self, column_keys=None, time_index=None, time_unit=None):
"""Compute volumetric rates from cumulative summary vectors
Column names that are not referring to cumulative summary
vectors are silently ignored.
A Dataframe is returned with volumetric rates, that is rate
values that can be summed up to the cumulative version. The
'T' in the column name is switched with 'R'. If you ask for
FOPT, you will get FOPR in the returned dataframe.
Rates in the returned dataframe are valid **forwards** in time,
opposed to rates coming directly from the Eclipse simulator which
are valid backwards in time.
If time_unit is set, the rates will be scaled to represent
either daily, monthly or yearly rates. These will sum up to the
cumulative as long as you multiply with the correct number
of days, months or year between each consecutive date index.
Month lengths and leap years are correctly handled.
Args:
column_keys (str or list of str): cumulative summary vectors
time_index (str or list of datetimes):
time_unit: str or None. If None, the rates returned will
be the difference in cumulative between each included
time step (where the time interval can vary arbitrarily)
If set to 'days', 'months' or 'years', the rates will
be scaled to represent a daily, monthly or yearly rate that
is compatible with the date index and the cumulative data.
Returns:
pd.DataFrame: analoguous to the dataframe returned by get_smry().
Empty dataframe if no data found.
"""
vol_dfs = []
for realidx, real in self.realizations.items():
vol_real = real.get_volumetric_rates(
column_keys=column_keys, time_index=time_index, time_unit=time_unit
)
if "DATE" not in vol_real.columns and vol_real.index.name == "DATE":
# This should be true, if not we might be in trouble.
vol_real.reset_index(inplace=True)
vol_real.insert(0, "REAL", realidx)
vol_dfs.append(vol_real)
if not vol_dfs:
return pd.DataFrame()
return pd.concat(vol_dfs, ignore_index=True, sort=False)
[docs] def filter(self, localpath, inplace=True, **kwargs):
"""Filter realizations or data within realizations
Calling this function can return a copy with fewer
realizations, or remove realizations from the current object.
Typical usage is to require that parameters.txt is present, or
that the OK file is present.
It is also possible to require a certain scalar to have a specific
value, for example filtering on a specific sensitivity case.
Args:
localpath (string): pointing to the data for which the filtering
applies. If no other arguments, only realizations containing
this data key is kept.
key (str): A certain key within a realization dictionary that is
required to be present. If a value is also provided, this
key must be equal to this value
value (str, int or float): The value a certain key must equal. Floating
point comparisons are not robust.
column (str): Name of a column in tabular data. If columncontains is
not specified, this means that this column must be present
columncontains (str, int or float):
A value that the specific column must include.
inplace (boolean): Indicating if the current object should have its
realizations stripped, or if a copy should be returned.
Default true.
Return:
If inplace=True, then nothing will be returned.
If inplace=False, a VirtualEnsemble fulfilling the filter
will be returned.
"""
deletethese = []
keepthese = []
for realidx, realization in self.realizations.items():
if inplace:
if not realization.contains(localpath, **kwargs):
deletethese.append(realidx)
elif realization.contains(localpath, **kwargs):
keepthese.append(realidx)
if inplace:
logger.info("Removing realizations %s", deletethese)
if deletethese:
self.remove_realizations(deletethese)
return self
filtered = VirtualEnsemble(self.name + " filtered")
for realidx in keepthese:
filtered.add_realization(self.realizations[realidx])
return filtered
[docs] def drop(self, localpath, **kwargs):
"""Delete elements from internalized data.
Shortcuts are allowed for localpath. If the data pointed to is
a DataFrame, you can delete columns, or rows containing certain
elements
If the data pointed to is a dictionary, keys can be deleted.
Args:
localpath: string, path to internalized data. If no other options
are supplied, that dataset is deleted in its entirety
column: string with a column name to drop. Only for dataframes
columns: list of strings with column names to delete
rowcontains: rows where one column contains this string will be
dropped. The comparison is on strings only, and all cells in
the dataframe is converted to strings for the comparison.
Thus it might work on dates, but be careful with numbers.
key: string with a keyname in a dictionary. Will not work for
dataframes
keys: list of strings of keys to delete from a dictionary
"""
if shortcut2path(self.keys(), localpath) not in self.keys():
raise ValueError("%s not found" % localpath)
for _, realization in self.realizations.items():
try: # noqa: SIM105
realization.drop(localpath, **kwargs)
except ValueError:
pass # Allow localpath to be missing in some realizations
[docs] def process_batch(self, batch=None):
"""Process a list of functions to run/apply
This is equivalent to calling each function individually
but this enables more efficient concurrency. It is meant
to be used for functions that modifies the realization
object, not for functions that returns a dataframe already.
Args:
batch (list): Each list element is a dictionary with one key,
being a function names, value pr key is a dict with keyword
arguments to be supplied to each function.
Returns:
ScratchEnsemble: This ensemble object (self), for it
to be picked up by ProcessPoolExecutor and pickling.
"""
for realization in self.realizations.values():
realization.process_batch(batch)
return self
[docs] def apply(self, callback, **kwargs):
"""Callback functionalty, apply a function to every realization
The supplied function handle will be handed over to
each underlying realization object. The function supplied
must return a Pandas DataFrame. The function can obtain
the realization object in the kwargs dictionary through
the key 'realization'.
Args:
callback: function handle
kwargs: dictionary where 'realization' and
'localpath' is reserved, will be forwarded
to the callbacked function
localpath: str, optional if the data is to be internalized
in each realization object.
Returns:
pd.DataFrame, aggregated result of the supplied function
on each realization.
"""
results = []
logger.info("Ensemble %s is running callback %s", self.name, str(callback))
for realidx, realization in self.realizations.items():
result = realization.apply(callback, **kwargs).copy()
# (we took a copy since we are modifying it here:)
# Todo: Avoid copy by concatenatint a dict of dataframes
# where realization index is the dict keys.
result["REAL"] = realidx
results.append(result)
return pd.concat(results, sort=False, ignore_index=True)
[docs] def get_smry_dates(
self,
freq="monthly",
normalize=True,
start_date=None,
end_date=None,
cache_eclsum=None,
include_restart=True,
):
"""Return list of datetimes for an ensemble according to frequency
Args:
freq: string denoting requested frequency for
the returned list of datetime. 'report' or 'raw' will
yield the sorted union of all valid timesteps for
all realizations. Other valid options are
'daily', 'monthly' and 'yearly'.
'first' will give out the first date (minimum).
'last' will give out the last date (maximum).
normalize: Whether to normalize backwards at the start
and forwards at the end to ensure the raw
date range is covered.
start_date: str or date with first date to include.
Dates prior to this date will be dropped, supplied
start_date will always be included. Overrides
normalized dates. Overridden if freq is 'first' or 'last'.
If string, use ISO-format, YYYY-MM-DD.
end_date: str or date with last date to be included.
Dates past this date will be dropped, supplied
end_date will always be included. Overrides
normalized dates. Overridden if freq is 'first' or 'last'.
If string, use ISO-format, YYYY-MM-DD.
include_restart: boolean sent to resdata for whether restart
files should be traversed.
Returns:
list of datetimes. Empty list if no data found.
"""
if cache_eclsum is not None:
warnings.warn(
(
"cache_eclsum option to get_smry_dates() is deprecated and "
"will be removed in fmu-ensemble v2.0.0"
),
FutureWarning,
)
else:
cache_eclsum = True
# Build list of list of eclsum dates
eclsumsdates = []
for _, realization in self.realizations.items():
if realization.get_eclsum(
cache=cache_eclsum, include_restart=include_restart
):
eclsumsdates.append(
realization.get_eclsum(
cache=cache_eclsum, include_restart=include_restart
).dates
)
return unionize_smry_dates(eclsumsdates, freq, normalize, start_date, end_date)
[docs] def get_smry_stats(
self,
column_keys=None,
time_index="monthly",
quantiles=None,
cache_eclsum=None,
start_date=None,
end_date=None,
):
"""
Function to extract the ensemble statistics (Mean, Min, Max, P10, P90)
for a set of simulation summary vectors (column key).
Compared to the agg() function, this function only works on summary
data (time series), and will only operate on actually requested data,
independent of what is internalized. It accesses the summary files
directly and can thus obtain data at any time frequency.
Args:
column_keys: list of column key wildcards
time_index: list of DateTime if interpolation is wanted
default is None, which returns the raw Eclipse report times
If a string is supplied, that string is attempted used
via get_smry_dates() in order to obtain a time index.
quantiles: list of ints between 0 and 100 for which quantiles
to compute. Quantiles refer to scientific standard, which
is opposite to the oil industry convention.
Ask for p10 if you need the oil industry p90.
cache_eclsum: boolean for whether to keep the loaded EclSum
object in memory after data has been loaded.
start_date: str or date with first date to include.
Dates prior to this date will be dropped, supplied
start_date will always be included. Overridden if time_index
is 'first' or 'last'. If string, use ISO-format, YYYY-MM-DD.
end_date: str or date with last date to be included.
Dates past this date will be dropped, supplied
end_date will always be included. Overridden if time_index
is 'first' or 'last'. If string, use ISO-format, YYYY-MM-DD.
Returns:
A MultiIndex dataframe. Outer index is 'minimum', 'maximum',
'mean', 'p10', 'p90', inner index are the dates. Column names
are the different vectors. Quantiles refer to the scientific
standard, opposite to the oil industry convention.
If quantiles are explicitly supplied, the 'pXX'
strings in the outer index are changed accordingly. If no
data is found, return empty DataFrame.
"""
if cache_eclsum is not None:
warnings.warn(
(
"cache_eclsum option to get_smry_stats() is deprecated and "
"will be removed in fmu-ensemble v2.0.0"
),
FutureWarning,
)
if quantiles is None:
quantiles = [10, 90]
# Check validity of quantiles to compute:
quantiles = list(map(int, quantiles)) # Potentially raise ValueError
for quantile in quantiles:
if quantile < 0 or quantile > 100:
raise ValueError("Quantiles must be integers " + "between 0 and 100")
# Obtain an aggregated dataframe for only the needed columns over
# the entire ensemble.
dframe = self.get_smry(
time_index=time_index,
column_keys=column_keys,
cache_eclsum=cache_eclsum,
start_date=start_date,
end_date=end_date,
)
if "REAL" in dframe:
dframe = dframe.drop(columns="REAL").groupby("DATE")
else:
logger.warning("No data found for get_smry_stats")
return pd.DataFrame()
# Build a dictionary of dataframes to be concatenated
dframes = {}
dframes["mean"] = dframe.mean()
for quantile in quantiles:
quantile_str = "p" + str(quantile)
dframes[quantile_str] = dframe.quantile(q=quantile / 100.0)
dframes["maximum"] = dframe.max()
dframes["minimum"] = dframe.min()
return pd.concat(dframes, names=["STATISTIC"], sort=False)
[docs] def get_wellnames(self, well_match=None):
"""
Return a union of all Eclipse Summary well names
in all realizations (union). In addition, can return a list
based on matches to an input string pattern.
Args:
well_match: `Optional`. String (or list of strings)
with wildcard filter. If None, all wells are returned
Returns:
list of strings with eclipse well names. Empty list if no
summary file or no matched well names.
"""
warnings.warn(
(
"ensemble.get_wellnames() is deprecated and "
"will be removed in fmu-ensemble v2.0.0"
),
FutureWarning,
)
if isinstance(well_match, str):
well_match = [well_match]
result = set()
for _, realization in self.realizations.items():
eclsum = realization.get_eclsum()
if eclsum:
if well_match is None:
result = result.union(set(eclsum.wells()))
else:
for well in well_match:
result = result.union(set(eclsum.wells(well)))
return sorted(result)
[docs] def get_groupnames(self, group_match=None):
"""
Return a union of all Eclipse Summary group names
in all realizations (union).
Optionally, the well names can be filtered.
Args:
well_match: `Optional`. String (or list of strings)
with wildcard filter (globbing). If None, all
wells are returned. Empty string does not match anything.
Returns:
list of strings with eclipse well names. Empty list if no
summary file or no matched well names.
"""
warnings.warn(
(
"ensemble.get_groupnames() is deprecated and "
"will be removed in fmu-ensemble v2.0.0"
),
FutureWarning,
)
if isinstance(group_match, str):
group_match = [group_match]
result = set()
for _, realization in self.realizations.items():
eclsum = realization.get_eclsum()
if eclsum:
if group_match is None:
result = result.union(set(eclsum.groups()))
else:
for group in group_match:
result = result.union(set(eclsum.groups(group)))
return sorted(result)
[docs] def agg(self, aggregation, keylist=None, excludekeys=None):
"""Aggregate the ensemble data into one VirtualRealization
All data will be attempted aggregated. String data will typically
be dropped in the result.
Arguments:
aggregation: string, supported modes are
'mean', 'median', 'p10', 'p90', 'min',
'max', 'std, 'var', 'pXX' where X is a number
keylist: list of strings, indicating which keys
in the internal datastore to include. If list is empty
(default), all data will be attempted included.
excludekeys: list of strings that should be excluded if
keylist is empty, otherwise ignored
Returns:
VirtualRealization. Its name will include the aggregation operator
WARNING: This code is duplicated in virtualensemble.py
"""
quantilematcher = re.compile(r"p(\d\d)")
supported_aggs = ["mean", "median", "min", "max", "std", "var"]
if aggregation not in supported_aggs and not quantilematcher.match(aggregation):
raise ValueError(
"{arg} is not a".format(arg=aggregation)
+ "supported ensemble aggregation"
)
# Generate a new empty object:
vreal = VirtualRealization(self.name + " " + aggregation)
# Determine keys to use
if isinstance(keylist, str):
keylist = [keylist]
if not keylist: # Empty list means all keys.
if not isinstance(excludekeys, list):
excludekeys = [excludekeys]
keys = set(self.keys()) - set(excludekeys)
else:
keys = keylist
for key in keys:
# Aggregate over this ensemble:
# Ensure we operate on fully qualified localpath's
key = shortcut2path(self.keys(), key)
data = self.get_df(key)
# This column should never appear in aggregated data
del data["REAL"]
# Look for data we should group by. This would be beneficial
# to get from a metadata file, and not by pure guesswork.
groupbycolumncandidates = [
"DATE",
"FIPNUM",
"ZONE",
"REGION",
"JOBINDEX",
"Zone",
"Region_index",
]
# Pick up string columns (or non-numeric values)
# (when strings are used as values, this breaks, but it is also
# meaningless to aggregate them. Most likely, strings in columns
# is a label we should group over)
stringcolumns = [x for x in data.columns if data.dtypes[x] == "object"]
groupby = [x for x in groupbycolumncandidates if x in data.columns]
# Add remainding string columns to columns to group by unless
# we are working with the STATUS dataframe, which has too many strings..
if key != "STATUS":
groupby = list(set(groupby + stringcolumns))
# Filter to only numerical columns and groupby columns:
numerical_and_groupby_cols = list(
set(list(groupby) + list(data.select_dtypes(include="number").columns))
)
data = data[numerical_and_groupby_cols]
dtypes = data.dtypes.unique()
if not (int in dtypes or float in dtypes):
logger.info("No numerical data to aggregate in %s", key)
continue
if groupby:
logger.info("Grouping %s by %s", key, groupby)
aggobject = data.groupby(groupby)
else:
aggobject = data
if quantilematcher.match(aggregation):
quantile = int(quantilematcher.match(aggregation).group(1))
aggregated = aggobject.quantile(quantile / 100.0)
else:
# Passing through the variable 'aggregation' to
# Pandas, thus supporting more than we have listed in
# the docstring.
aggregated = aggobject.agg(aggregation)
if groupby:
aggregated.reset_index(inplace=True)
# We have to recognize scalars.
if len(aggregated) == 1 and aggregated.index.values[0] == key:
aggregated = parse_number(aggregated.values[0])
vreal.append(key, aggregated)
return vreal
@property
def files(self):
"""Return a concatenation of files in each realization"""
filedflist = []
for realidx, realization in self.realizations.items():
realfiles = realization.files.copy()
realfiles.insert(0, "REAL", realidx)
filedflist.append(realfiles)
return pd.concat(filedflist, ignore_index=True, sort=False)
@property
def name(self):
"""The ensemble name."""
return self._name
@name.setter
def name(self, newname):
if isinstance(newname, str):
self._name = newname
else:
raise ValueError("Name input is not a string")
def __sub__(self, other):
"""Substract another ensemble from this"""
result = EnsembleCombination(ref=self, sub=other)
return result
def __add__(self, other):
"""Add another ensemble to this"""
result = EnsembleCombination(ref=self, add=other)
return result
def __mul__(self, other):
"""Scale this ensemble with a scalar value"""
result = EnsembleCombination(ref=self, scale=float(other))
return result
def __rsub__(self, other):
"""Substract another ensemble from this"""
result = EnsembleCombination(ref=self, sub=other)
return result
def __radd__(self, other):
"""Add another ensemble to this"""
result = EnsembleCombination(ref=self, add=other)
return result
def __rmul__(self, other):
"""Scale this ensemble with a scalar value"""
result = EnsembleCombination(ref=self, scale=float(other))
return result
[docs] def get_realindices(self):
"""Return the integer indices for realizations in this ensemble
Returns:
list of integers
"""
return self.realizations.keys()
[docs] def get_smry(
self,
time_index=None,
column_keys=None,
cache_eclsum=None,
start_date=None,
end_date=None,
include_restart=True,
):
"""
Aggregates summary data from all realizations.
Wraps around Realization.get_smry() which wraps around
resdata.summary.Summary.pandas_frame()
Args:
time_index: list of DateTime if interpolation is wanted
default is None, which returns the raw Eclipse report times
If a string with an ISO-8601 date is supplied, that date
is used directly, otherwise the string is assumed to indicate
a wanted frequencey for dates, daily, weekly, monthly, yearly,
that will be send to get_smry_dates()
column_keys: list of column key wildcards
cache_eclsum: boolean for whether to cache the EclSum
objects. Defaults to True. Set to False if
not enough memory to keep all summary files in memory.
start_date: str or date with first date to include.
Dates prior to this date will be dropped, supplied
start_date will always be included. Overridden if time_index
is 'first' or 'last'.
end_date: str or date with last date to be included.
Dates past this date will be dropped, supplied
end_date will always be included. Overridden if time_index
is 'first' or 'last'.
include_restart: boolean sent to resdata for whether restart
files should be traversed.
Returns:
A DataFame of summary vectors for the ensemble. The column
REAL with integers is added to distinguish realizations. If
no realizations, empty DataFrame is returned.
"""
if cache_eclsum is not None:
warnings.warn(
(
"cache_eclsum option to get_smry() is deprecated and "
"will be removed in fmu-ensemble v2.0.0"
),
FutureWarning,
)
if isinstance(time_index, str):
# Try interpreting as ISO-date:
try:
parseddate = dateutil.parser.isoparse(time_index)
time_index = [parseddate]
# But this should fail when a frequency string is supplied:
except ValueError:
time_index = self.get_smry_dates(
time_index,
start_date=start_date,
end_date=end_date,
include_restart=include_restart,
)
dflist = []
for index, realization in self.realizations.items():
dframe = realization.get_smry(
time_index=time_index,
column_keys=column_keys,
cache_eclsum=cache_eclsum,
include_restart=include_restart,
)
dframe.insert(0, "REAL", index)
dframe.index.name = "DATE"
dflist.append(dframe)
if dflist:
return pd.concat(dflist, sort=False).reset_index()
return pd.DataFrame()
[docs] def get_eclgrid(self, props, report=0, agg="mean", active_only=False):
"""
Returns the grid (i,j,k) and (x,y), and any requested init
and/or unrst property. The values are aggregated over the
ensemble (mean/ std currently supported).
Args:
props: list of column key wildcards
report: int. for unrst props only. Report step for given date.
Use the function get_unrst_report_dates to get an overview
of the report steps availible.
agg: String. "mean" or "std".
active_only: bool. True if activate cells only.
Returns:
A dictionary. Index by grid attribute, and contains a list
corresponding to a set of values for each grid cells.
"""
warnings.warn(
(
"ensemble.get_eclgrid() is deprecated and "
"will be removed in fmu-ensemble v2.0.0"
),
FutureWarning,
)
egrid_reals = [
real for real in self.realizations.values() if real.get_grid() is not None
]
# Pick a "random" reference realization for grids.
ref = egrid_reals[0]
grid_index = ref.get_grid_index(active_only=active_only)
corners = ref.get_grid_corners(grid_index)
centre = ref.get_grid_centre(grid_index)
dframe = grid_index.reset_index().join(corners).join(centre)
dframe["realizations_active"] = self.global_active.numpy_copy()
for prop in props:
logger.info("Reading the grid property: %s", prop)
if prop in self.init_keys:
dframe[prop] = self.get_init(prop, agg=agg)
if prop in self.unrst_keys:
dframe[prop] = self.get_unrst(prop, agg=agg, report=report)
dframe.drop("index", axis=1, inplace=True)
dframe.set_index(["i", "j", "k", "active"])
return dframe
@property
def global_active(self):
"""
:returns: An ResdataKW with, for each cell,
the number of realizations where the cell is active.
"""
warnings.warn(
(
"ensemble.global_active() is deprecated and "
"will be removed in fmu-ensemble v2.0.0"
),
FutureWarning,
)
if not self._global_active:
self._global_active = ResdataKW(
"eactive", self.global_size, ResDataType.RD_INT
)
for realization in self.realizations.values():
if realization.get_grid() is not None:
self._global_active += realization.actnum
return self._global_active
@property
def global_size(self):
"""
:returns: global size of the realizations in the Ensemble. see
:func:`fmu_postprocessing.modelling.Realization.global_size()`.
"""
warnings.warn(
(
"ensemble.global_size() is deprecated and "
"will be removed in fmu-ensemble v2.0.0"
),
FutureWarning,
)
if not self.realizations:
return 0
if self._global_size is None:
egrid_reals = [
real
for real in self.realizations.values()
if real.get_grid() is not None
]
ref = egrid_reals[0]
self._global_size = ref.global_size
return self._global_size
def _get_grid_index(self, active=True):
"""
Get the grid index from the first present realization
Returns: The grid of the ensemble, see
:func:`fmu.ensemble.Realization.get_grid()`.
"""
# ensemble._get_grid_index() is deprecated and
# will be removed in fmu-ensemble v2.0.0
if not self.realizations:
return None
return list(self.realizations.values())[0].get_grid_index(active=active)
@property
def init_keys(self):
"""Return all keys available in the Eclipse INIT file"""
warnings.warn(
(
"ensemble.init_keys() is deprecated and "
"will be removed in fmu-ensemble v2.0.0"
),
FutureWarning,
)
if not self.realizations:
return None
all_keys = set.union(
*[
set(realization.get_init().keys())
for _, realization in self.realizations.items()
if realization.get_grid() is not None
]
)
return all_keys
@property
def unrst_keys(self):
"""Return keys availaible in the Eclipse UNRST file"""
warnings.warn(
(
"ensemble.unrst_keys() is deprecated and "
"will be removed in fmu-ensemble v2.0.0"
),
FutureWarning,
)
if not self.realizations:
return None
all_keys = set.union(
*[
set(realization.get_unrst().keys())
for _, realization in self.realizations.items()
if realization.get_unrst() is not None
]
)
return all_keys
[docs] def get_unrst_report_dates(self):
"""Returns UNRST report step and the corresponding date"""
warnings.warn(
(
"ensemble.get_unrst_report_dates() is deprecated and "
"will be removed in fmu-ensemble v2.0.0"
),
FutureWarning,
)
if not self.realizations:
return None
all_report_dates = set.union(
*[
set(realization.report_dates)
for _, realization in self.realizations.items()
if realization.get_unrst() is not None
]
)
all_report_dates = list(all_report_dates)
all_report_dates.sort()
dframe = pd.DataFrame(all_report_dates, columns=["Dates"])
dframe.index.names = ["Report"]
return dframe
[docs] def get_init(self, prop, agg):
"""
:param prop: A time independent property,
:returns: Dictionary with ``mean`` or ``std_dev`` as keys,
and corresponding values for given property as values.
:raises ValueError: If prop is not found.
"""
warnings.warn(
(
"ensemble.get_init() is deprecated and "
"will be removed in fmu-ensemble v2.0.0"
),
FutureWarning,
)
if agg == "mean":
mean = self._keyword_mean(prop, self.global_active)
return pd.Series(mean.numpy_copy(), name=prop)
if agg == "std":
std_dev = self._keyword_std_dev(prop, self.global_active, mean)
return pd.Series(std_dev.numpy_copy(), name=prop)
return pd.Series()
[docs] def get_unrst(self, prop, report, agg):
"""
:param prop: A time dependent property, see
`fmu_postprocessing.modelling.SimulationGrid.TIME_DEPENDENT`.
:returns: Dictionary with ``mean`` and ``std_dev`` as keys,
and corresponding values for given property as values.
:raises ValueError: If prop is not in `TIME_DEPENDENT`.
"""
warnings.warn(
(
"ensemble.get_unrst() is deprecated and "
"will be removed in fmu-ensemble v2.0.0"
),
FutureWarning,
)
if agg == "mean":
mean = self._keyword_mean(prop, self.global_active, report=report)
return pd.Series(mean.numpy_copy(), name=prop)
if agg == "std":
std_dev = self._keyword_std_dev(
prop, self.global_active, mean, report=report
)
return pd.Series(std_dev.numpy_copy(), name=prop)
return pd.Series()
def _keyword_mean(self, prop, global_active, report=None):
"""
:returns: Mean values of keywords.
:param prop: Name of resulting Keyword.
:param global_active: A ResdataKW with, for each cell, The number of
realizations where the cell is active.
:param report: Report step for unrst keywords
"""
# ensemble._keyword_mean() is deprecated and
# will be removed in fmu-ensemble v2.0.0
mean = ResdataKW(prop, len(global_active), ResDataType.RD_FLOAT)
if report:
for _, realization in self.realizations.items():
if realization.get_unrst() is not None:
mean += realization.get_global_unrst_keyword(prop, report)
mean.safe_div(global_active)
return mean
for _, realization in self.realizations.items():
if realization.get_grid() is not None:
mean += realization.get_global_init_keyword(prop)
mean.safe_div(global_active)
return mean
def _keyword_std_dev(self, prop, global_active, mean, report=0):
"""
:returns: Standard deviation of keywords.
:param name: Name of resulting Keyword.
:param keywords: List of pairs of keywords and list of active cell
:param global_active: A ResdataKW with, for each cell, The number of
realizations where the cell is active.
:param mean: Mean of keywords.
"""
# ensemble._keyword_std_dev() is deprecated and
# will be removed in fmu-ensemble v2.0.0
std_dev = ResdataKW(prop, len(global_active), ResDataType.RD_FLOAT)
if report:
for _, realization in self.realizations.items():
real_prop = realization.get_global_unrst_keyword(prop, report)
std_dev.add_squared(real_prop - mean)
std_dev.safe_div(global_active)
return std_dev.isqrt()
for _, realization in self.realizations.items():
if realization.get_grid() is not None:
real_prop = realization.get_global_init_keyword(prop)
std_dev.add_squared(real_prop - mean)
std_dev.safe_div(global_active)
return std_dev.isqrt()