"""Module for the ScratchRealization class
A realization is a set of results from one subsurface model
realization. A realization can be either defined from
its output files from the FMU run on the file system,
it can be computed from other realizations, or it can be
an archived realization.
"""
import copy
import glob
import json
import logging
import os
import re
import warnings
from datetime import date, datetime, time
import dateutil
import numpy as np
import pandas as pd
import yaml
from resdata.grid import Grid
from resdata.rd_util import FileMode
from resdata.resfile import ResdataFile
from resdata.summary import Summary
from .realizationcombination import RealizationCombination
from .util import flatten, parse_number, shortcut2path
from .util.dates import unionize_smry_dates
from .util.rates import compute_volumetric_rates
from .virtualrealization import VirtualRealization
HAVE_ECL2DF = False
try:
import ecl2df
HAVE_ECL2DF = True
except ImportError:
HAVE_ECL2DF = False
HAVE_RES2DF = False
try:
import res2df
HAVE_RES2DF = True
except ImportError:
HAVE_RES2DF = False
logger = logging.getLogger(__name__)
[docs]class ScratchRealization(object):
r"""A representation of results still present on disk
ScratchRealizations point to the filesystem for their
contents.
A realization must at least contain a STATUS file.
Additionally, jobs.json and parameters.txt will be attempted
loaded by default.
The realization is defined by the pointers to the filesystem.
When asked for, this object will return data from the
filesystem (or from cache if already computed).
The files dataframe is the central filesystem pointer
repository for the object. It will at least contain
the columns
* FULLPATH absolute path to a file
* FILETYPE filename extension (after last dot)
* LOCALPATH relative filename inside realization diretory
* BASENAME filename only. No path. Includes extension
This dataframe is available as a read-only property from the object
Args:
path (str): absolute or relative path to a directory
containing a realizations files.
realidxregexp (re/str): a compiled regular expression which
is used to determine the realization index (integer)
from the path. First match is the index.
Default: realization-(\d+)
Only needs to match path components.
If a string is supplied, it will be attempted
compiled into a regular expression.
index (int): the realization index to be used, will
override anything else.
autodiscovery (boolean): whether the realization should try to
auto-discover certain data (UNSMRY files in standard location)
batch (dict): List of functions (load_*) that
should be run at time of initialization. Each element is a
length 1 dictionary with the function name to run as the key
and each keys value should be the function arguments as a dict.
"""
def __init__(
self, path, realidxregexp=None, index=None, autodiscovery=True, batch=None
):
self._origpath = os.path.abspath(path)
self.index = None
self._autodiscovery = autodiscovery
if not realidxregexp:
realidxregexp = re.compile(r"realization-(\d+)")
# Try to compile the regexp on behalf of the user.
if isinstance(realidxregexp, str):
realidxregexp = re.compile(realidxregexp)
if isinstance(realidxregexp, str):
raise ValueError("Supplied realidxregexp not valid")
self.files = pd.DataFrame(
columns=["FULLPATH", "FILETYPE", "LOCALPATH", "BASENAME"]
)
self._eclsum = None # Placeholder for caching
self._eclsum_include_restart = None # Flag for cached object
# The datastore for internalized data. Dictionary
# indexed by filenames (local to the realization).
# values in the dictionary can be either dicts or dataframes
self.data = {}
self._eclinit = None
self._eclunrst = None
self._eclgrid = None
self._ecldata = None
self._actnum = None
abspath = os.path.abspath(path)
if index is None:
for path_comp in reversed(os.path.abspath(path).split(os.path.sep)):
realidxmatch = re.match(realidxregexp, path_comp)
if realidxmatch:
self.index = int(realidxmatch.group(1))
break
else:
logger.warning(
(
"Could not determine realization "
"index for %s, "
"this cannot be inserted in an Ensemble"
),
abspath,
)
logger.warning("Maybe you need to use index=<someinteger>")
self.index = None
else:
self.index = int(index)
# Now look for some common files, but don't require any
if os.path.exists(os.path.join(abspath, "STATUS")):
filerow = {
"LOCALPATH": "STATUS",
"FILETYPE": "STATUS",
"FULLPATH": os.path.join(abspath, "STATUS"),
"BASENAME": "STATUS",
}
self.files = pd.concat(
[self.files, pd.DataFrame([filerow])], ignore_index=True
)
self.load_status()
else:
logger.warning("No STATUS file, %s", abspath)
if os.path.exists(os.path.join(abspath, "jobs.json")):
filerow = {
"LOCALPATH": "jobs.json",
"FILETYPE": "json",
"FULLPATH": os.path.join(abspath, "jobs.json"),
"BASENAME": "jobs.json",
}
self.files = pd.concat(
[self.files, pd.DataFrame([filerow])], ignore_index=True
)
if os.path.exists(os.path.join(abspath, "OK")):
self.load_scalar("OK")
if os.path.exists(os.path.join(abspath, "parameters.txt")):
self.load_txt("parameters.txt")
if batch:
self.process_batch(batch)
logger.info("Initialized %s", abspath)
[docs] def process_batch(self, batch):
"""Process a list of functions to run/apply
This is equivalent to calling each function individually
but this enables more efficient concurrency. It is meant
to be used for functions that modifies the realization
object, not for functions that returns a dataframe already.
Args:
batch (list): Each list element is a dictionary with one key,
being a function names, value pr key is a dict with keyword
arguments to be supplied to each function.
Returns:
ScratchRealization: This realization object (self), for it
to be picked up by ProcessPoolExecutor and pickling.
"""
assert isinstance(batch, list)
allowed_functions = [
"apply",
"find_files",
"load_smry",
"load_txt",
"load_file",
"load_csv",
"load_status",
"load_scalar",
]
for cmd in batch:
assert isinstance(cmd, dict)
assert len(cmd) == 1
fn_name = list(cmd.keys())[0]
logger.info(
"Batch processing (#%d): %s with args %s",
self.index,
fn_name,
str(cmd[fn_name]),
)
if fn_name not in allowed_functions:
logger.warning("process_batch skips illegal function: %s", fn_name)
continue
assert isinstance(cmd[fn_name], dict)
getattr(self, fn_name)(**cmd[fn_name])
return self
[docs] def runpath(self):
"""Return the runpath ("root") of the realization
Returns:
str: the filesystem path which at least existed
at time of object initialization.
"""
return self._origpath
[docs] def to_virtual(self, name=None, deepcopy=True):
"""Convert the current ScratchRealization object
to a VirtualRealization
Args:
description (str): used as label
deepcopy (boolean): Set to true if you want to continue
to manipulate the ScratchRealization object
afterwards without affecting the virtual realization.
Defaults to True. False will give faster execution.
"""
if not name:
name = self._origpath
if deepcopy:
vreal = VirtualRealization(name, copy.deepcopy(self.data))
else:
vreal = VirtualRealization(name, self.data)
# Conserve metadata for smry vectors. Build metadata dict for all
# loaded summary vectors.
smrycolumns = [
self.get_df(key).columns for key in self.keys() if "unsmry" in key
]
smrycolumns = {smrykey for sublist in smrycolumns for smrykey in sublist}
meta = self.get_smry_meta(list(smrycolumns))
if meta:
meta_df = pd.DataFrame.from_dict(meta, orient="index")
meta_df.index.name = "SMRYCOLUMN"
vreal.append("__smry_metadata", meta_df.reset_index())
return vreal
[docs] def load_file(self, localpath, fformat, convert_numeric=True, force_reread=False):
"""
Parse and internalize files from disk.
Several file formats are supported:
- txt (one key-value pair pr. line)
- csv
- scalar (one number or one string in the first line)
"""
if fformat == "txt":
self.load_txt(localpath, convert_numeric, force_reread)
elif fformat == "csv":
self.load_csv(localpath, convert_numeric, force_reread)
elif fformat == "scalar":
self.load_scalar(localpath, convert_numeric, force_reread)
else:
raise ValueError("Unsupported file format %s" % fformat)
[docs] def load_scalar(
self,
localpath,
convert_numeric=False,
force_reread=False,
comment=None,
skip_blank_lines=True,
skipinitialspace=True,
):
"""Parse a single value from a file.
The value can be a string or a number.
Empty files are treated as existing, with an empty string as
the value, different from non-existing files.
pandas.read_table() is used to parse the contents, the args
'comment', 'skip_blank_lines', and 'skipinitialspace' is passed on
to that function.
Args:
localpath: path to the file, local to the realization
convert_numeric: If True, non-numerical content will be thrown away
force_reread: Reread the data from disk.
Returns:
str/number: the value read from the file.
"""
fullpath = os.path.abspath(os.path.join(self._origpath, localpath))
if not os.path.exists(fullpath):
raise IOError("File not found: " + fullpath)
if fullpath in self.files["FULLPATH"].values and not force_reread:
# Return cached version
return self.data[localpath]
if fullpath not in self.files["FULLPATH"].values:
filerow = {
"LOCALPATH": localpath,
"FILETYPE": localpath.split(".")[-1],
"FULLPATH": fullpath,
"BASENAME": os.path.split(localpath)[-1],
}
self.files = pd.concat(
[self.files, pd.DataFrame([filerow])], ignore_index=True
)
try:
value = pd.read_csv(
fullpath,
header=None,
sep="DONOTSEPARATEANYTHING *%magic%*",
engine="python",
skip_blank_lines=skip_blank_lines,
skipinitialspace=skipinitialspace,
comment=comment,
).iloc[0, 0]
except pd.errors.EmptyDataError:
value = ""
if convert_numeric:
value = parse_number(value)
if not isinstance(value, str):
self.data[localpath] = value
# In case we are re-reading, we must
# ensure there is no value present now:
elif localpath in self.data:
del self.data[localpath]
else:
self.data[localpath] = value
return value
[docs] def load_txt(self, localpath, convert_numeric=True, force_reread=False):
"""Parse a txt file with
<key> <value>
in each line.
The txt file will be internalized in a dict and will be
stored if the object is archived. Recommended file
extension is 'txt'.
Common usage is internalization of parameters.txt which
happens by default, but this can be used for all txt files.
The parsed data is returned as a dict. At the ensemble level
the same function returns a dataframe.
There is no get'er for the constructed data, access the
class variable keyvaluedata directly, or rerun this function.
(except for parameters.txt, for which there is a property
called 'parameters')
Values with spaces are not supported, this is similar
to ERT's CSV_EXPORT1. Remainder string will be ignored silently.
Args:
localpath: path local the realization to the txt file
convert_numeric: defaults to True, will try to parse
all values as integers, if not, then floats, and
strings as the last resort.
force_reread: Force reread from file system. If
False, repeated calls to this function will
returned cached results.
Returns:
dict: Dictionary with the parsed values. Values will be returned as
integers, floats or strings. If convert_numeric
is False, all values are strings.
"""
fullpath = os.path.abspath(os.path.join(self._origpath, localpath))
if not os.path.exists(fullpath):
raise IOError("File not found: " + fullpath)
if fullpath in self.files["FULLPATH"].values and not force_reread:
# Return cached version
return self.data[localpath]
if fullpath not in self.files["FULLPATH"].values:
filerow = {
"LOCALPATH": localpath,
"FILETYPE": localpath.split(".")[-1],
"FULLPATH": fullpath,
"BASENAME": os.path.split(localpath)[-1],
}
self.files = pd.concat(
[self.files, pd.DataFrame([filerow])], ignore_index=True
)
try:
keyvalues = pd.read_csv(
fullpath,
sep=r"\s+",
index_col=0,
dtype=str,
usecols=[0, 1],
header=None,
)[1].to_dict()
except pd.errors.EmptyDataError:
keyvalues = {}
if convert_numeric:
for key in keyvalues:
keyvalues[key] = parse_number(keyvalues[key])
self.data[localpath] = keyvalues
return keyvalues
[docs] def load_csv(self, localpath, convert_numeric=True, force_reread=False):
"""Parse a CSV file as a DataFrame
Data will be stored as a DataFrame for later
access or storage.
Filename is relative to realization root.
Args:
localpath: path local the realization to the txt file
convert_numeric: defaults to True, will try to parse
all values as integers, if not, then floats, and
strings as the last resort.
force_reread: Force reread from file system. If
False, repeated calls to this function will
returned cached results.
Returns:
dataframe: The CSV file loaded. Empty dataframe
if file is not present.
"""
fullpath = os.path.abspath(os.path.join(self._origpath, localpath))
if not os.path.exists(fullpath):
raise IOError("File not found: " + fullpath)
# Look for cached version
if localpath in self.data and not force_reread:
return self.data[localpath]
# Check the file store, append if not there
if localpath not in self.files["LOCALPATH"].values:
filerow = {
"LOCALPATH": localpath,
"FILETYPE": localpath.split(".")[-1],
"FULLPATH": fullpath,
"BASENAME": os.path.split(localpath)[-1],
}
self.files = pd.concat(
[self.files, pd.DataFrame([filerow])], ignore_index=True
)
try:
# Trust that Pandas will determine sensible datatypes
# faster than the convert_numeric() function
dtype = None if convert_numeric else str
dframe = pd.read_csv(fullpath, dtype=dtype)
if "REAL" in dframe:
dframe.rename(columns={"REAL": "REAL_ORIG"}, inplace=True)
logger.warning(
(
"Loaded file %s already had the column REAL, "
"this was renamed to REAL_ORIG"
),
fullpath,
)
except pd.errors.EmptyDataError:
dframe = None # or empty dataframe?
# Store parsed data:
self.data[localpath] = dframe
return dframe
[docs] def load_status(self):
"""Collects the contents of the STATUS files and return
as a dataframe, with information from jobs.json added if
available.
Each row in the dataframe is a finished FORWARD_MODEL
The STATUS files are parsed and information is extracted.
Job duration is calculated, but jobs above 24 hours
get incorrect durations.
Returns:
A dataframe with information from the STATUS files.
Each row represents one job in one of the realizations.
"""
statusfile = os.path.join(self._origpath, "STATUS")
if not os.path.exists(statusfile):
# This should not happen as long as __init__ requires STATUS
# to be present.
return pd.DataFrame() # will be empty
errorcolumns = ["error" + str(x) for x in range(0, 10)]
status = pd.read_csv(
statusfile,
sep=r"\s+",
skiprows=1,
header=None,
names=["FORWARD_MODEL", "colon", "STARTTIME", "dots", "ENDTIME"]
+ errorcolumns,
dtype=str,
engine="python",
on_bad_lines="skip",
)
status.fillna("", inplace=True)
errorjobs = status[errorcolumns[0]] != ""
# Merge any error strings:
error_string = (
status.loc[errorjobs, errorcolumns]
.astype(str)
.apply(" ".join, axis=1)
.apply(str.strip)
)
status["errorstring"] = pd.NA
status.loc[errorjobs, "errorstring"] = error_string
status.drop(errorcolumns, axis=1, inplace=True)
# Delete potential unwanted row
status = status[~((status.FORWARD_MODEL == "LSF") & (status.colon == "JOBID:"))]
if status.empty:
logger.warning("No parseable data in STATUS")
self.data["STATUS"] = status
return status
status = status.reset_index().drop("colon", axis=1).drop("dots", axis=1)
# Index the jobs, this makes it possible to match with jobs.json:
status.insert(0, "JOBINDEX", status.index.astype(int))
status = status.drop("index", axis=1)
# Calculate duration. Only Python 3.6 has time.fromisoformat().
# Warning: Unpandaic code..
durations = []
for _, jobrow in status.iterrows():
if not jobrow["ENDTIME"]: # A job that is not finished.
durations.append(np.nan)
else:
try:
hms = list(map(int, jobrow["STARTTIME"].split(":")))
start = datetime.combine(
date.today(), time(hour=hms[0], minute=hms[1], second=hms[2])
)
hms = list(map(int, jobrow["ENDTIME"].split(":")))
end = datetime.combine(
date.today(), time(hour=hms[0], minute=hms[1], second=hms[2])
)
# This works also when we have crossed 00:00:00.
# Jobs > 24 h will be wrong.
durations.append((end - start).seconds)
except ValueError:
# We get where if STARTIME.split(':') does not contain
# integers only:
durations.append(np.nan)
except IndexError:
# We get here if a clock time string is invalid, like missing seconds.
print("got indexerror")
durations.append(np.nan)
status["DURATION"] = durations
# Augment data from jobs.json if that file is available:
jsonfilename = os.path.join(self._origpath, "jobs.json")
if jsonfilename and os.path.exists(jsonfilename):
try:
with open(jsonfilename) as file_handle:
jobsinfo = json.load(file_handle)
jobsinfodf = pd.DataFrame(jobsinfo["jobList"])
jobsinfodf["JOBINDEX"] = jobsinfodf.index.astype(int)
# Outer merge means that we will also have jobs from
# jobs.json that has not started (failed or perhaps
# the jobs are still running on the cluster)
status = status.merge(jobsinfodf, how="outer", on="JOBINDEX")
except ValueError:
logger.warning("Parsing file %s failed, skipping", jsonfilename)
status.sort_values(["JOBINDEX"], ascending=True, inplace=True)
self.data["STATUS"] = status
return status
[docs] def apply(self, callback, **kwargs):
"""Callback functionality
A function handle can be supplied which will be executed on
this realization. The function supplied *must* return
a Pandas DataFrame. The function can accept an additional
kwargs dictionary with extra information. Special keys
in the kwargs data are 'realization', which will hold
the current realization object. The key 'localpath' is
also reserved for the use inside this apply(), as it
is used for the name of the internalized data.
If the key 'dumptofile' is a boolean and set to True,
the resulting dataframe is also attempted written
to disk using the supplied 'localpath'.
Args:
**kwargs (dict): which is supplied to the callbacked function,
in which the key 'localpath' also points the the name
used for data internalization.
"""
if not kwargs:
kwargs = {}
if "realization" in kwargs:
raise ValueError("Never supply realization= to apply()")
kwargs["realization"] = self
# Allow for calling functions which cannot take any
# arguments:
try:
result = callback(kwargs) # lgtm [py/call/wrong-arguments]
except TypeError:
result = callback()
if not isinstance(result, pd.DataFrame):
raise ValueError(
"Returned value from applied " + "function must be a dataframe"
)
# Only internalize if 'localpath' is given
if "localpath" in kwargs:
self.data[kwargs["localpath"]] = result
if "dumptodisk" in kwargs and kwargs["dumptodisk"]:
if not kwargs["localpath"]:
raise ValueError(
"localpath must be supplied when" + "dumptodisk is used"
)
fullpath = os.path.join(self.runpath(), kwargs["localpath"])
if not os.path.exists(os.path.dirname(fullpath)):
os.makedirs(os.path.dirname(fullpath))
if os.path.exists(fullpath):
os.unlink(fullpath)
logger.info("Writing result of function call to %s", fullpath)
result.to_csv(fullpath, index=False)
return result
def __getitem__(self, localpath):
"""Direct access to the realization data structure
Calls get_df(localpath).
"""
return self.get_df(localpath)
def __delitem__(self, localpath):
"""Deletes components in the internal datastore.
Silently ignores data that is not found.
Args:
localpath: string, fully qualified name of key
(no shorthand as for get_df())
"""
if localpath in self.keys():
del self.data[localpath]
[docs] def keys(self):
"""Access the keys of the internal data structure"""
return self.data.keys()
[docs] def get_df(self, localpath, merge=None):
"""Access the internal datastore which contains dataframes or dicts
or scalars.
The localpath argument can be shortened, as it will be
looked up using the function shortcut2path()
Args:
localpath (str): the idenfier of the data requested
merge (list or str): identifier/localpath of some data to be merged in,
typically 'parameters.txt'. Will only work when return type is a
dataframe. If list is supplied, order can matter.
Returns:
dataframe or dictionary.
Raises:
KeyError if data is not found.
TypeError if data in localpath or merge is not of a mergeable type
"""
fullpath = shortcut2path(self.keys(), localpath)
if fullpath not in self.data.keys():
raise KeyError("Could not find {}".format(localpath))
data = self.data[shortcut2path(self.keys(), localpath)]
if not isinstance(merge, list):
merge = [merge] # can still be None
if merge and merge[0] is not None:
# Strange things can happen when we do merges since
# this function happily returns references to the internal
# dataframes in the realization object. So ensure
# we copy dataframes if any merging is about to happen.
if isinstance(data, (pd.DataFrame, dict)):
data = data.copy()
elif isinstance(data, (str, int, float, np.number)):
# Convert scalar data into something mergeable
value = data
data = {localpath: value}
else:
raise TypeError(
"Don't know how to merge data "
+ "from {} of type {}".format(localpath, type(data))
)
for mergekey in merge:
if mergekey is None:
continue
mergedata = self.get_df(mergekey)
if isinstance(mergedata, dict):
for key in mergedata:
# Add a column to the data for each dictionary
# key:
data[key] = mergedata[key]
elif isinstance(mergedata, (str, int, float, np.number)):
# Scalar data, use the mergekey as column
data[mergekey] = mergedata
elif isinstance(mergedata, pd.DataFrame):
data = pd.merge(data, mergedata)
# pd.MergeError will be raised here when this fails,
# there must be common columns for this operation.
else:
raise TypeError(
"Don't know how to merge data "
+ "from {} of type {}".format(mergekey, type(data))
)
return data
[docs] def find_files(self, paths, metadata=None, metayaml=False):
"""Discover realization files. The files dataframe
will be updated.
Certain functionality requires up-front file discovery,
e.g. ensemble archiving and ensemble arithmetic.
CSV files for single use do not have to be discovered.
Files containing double-dashes '--' indicate that the double
dashes separate different component with meaning in the
filename. The components are extracted and put into
additional columns "COMP1", "COMP2", etc..
Filetype extension (after the last dot) will be removed
from the last component.
Args:
paths: str or list of str with filenames (will be globbed)
that are relative to the realization directory.
metadata: dict with metadata to assign for the discovered
files. The keys will be columns, and its values will be
assigned as column values for the discovered files.
During rediscovery of files, old metadata will be removed.
metayaml: Additional possibility of adding metadata from
associated yaml files. Yaml files to be associated to
a specific discovered file can have an optional dot in
front, and must end in .yml, added to the discovered filename.
The yaml file will be loaded as a dict, and have its keys
flattened using the separator '--'. Flattened keys are
then used as column headers in the returned dataframe.
Returns:
A slice of the internalized dataframe corresponding
to the discovered files (will be included even if it has
been discovered earlier)
"""
if isinstance(paths, str):
paths = [paths]
returnedslice = pd.DataFrame(
columns=["FULLPATH", "FILETYPE", "LOCALPATH", "BASENAME"]
)
for searchpath in paths:
globs = [
f
for f in glob.glob(os.path.join(self._origpath, searchpath))
if os.path.isfile(f)
]
for match in globs:
absmatch = os.path.abspath(match)
dirname = os.path.dirname(absmatch)
basename = os.path.basename(match)
filetype = match.split(".")[-1]
filerow = {
"LOCALPATH": os.path.relpath(match, self._origpath),
"FILETYPE": filetype,
"FULLPATH": absmatch,
"BASENAME": basename,
}
# Look for and split basename based on double-dash '--'
basename_noext = basename.replace("." + filetype, "")
if "--" in basename_noext:
for compidx, comp in enumerate(basename_noext.split("--")):
filerow["COMP" + str(compidx + 1)] = comp
if metayaml:
metadict = {}
yaml_candidates = [
"." + basename + ".yml",
basename + ".yml",
"." + basename + ".yaml",
basename + ".yaml",
]
# We will only parse the first one found! You
# might be out of luck if you have multiple..
for cand in yaml_candidates:
if os.path.exists(os.path.join(dirname, cand)):
with open(os.path.join(dirname, cand)) as file_handle:
metadict = yaml.full_load(file_handle)
continue
# Flatten metadict:
metadict = flatten(metadict, sep="--")
for key, value in metadict.items():
if key not in filerow:
filerow[key] = value
else:
logger.warning(
"Cannot add key %s from yaml, key is in use. Skipping.",
key,
)
# Delete this row if it already exists, determined by FULLPATH
if absmatch in self.files["FULLPATH"].values:
self.files = self.files[self.files["FULLPATH"] != absmatch]
if metadata:
filerow.update(metadata)
self.files = pd.concat(
[self.files, pd.DataFrame([filerow])], ignore_index=True
)
returnedslice = pd.concat(
[returnedslice, pd.DataFrame([filerow])], ignore_index=True
)
return returnedslice
@property
def parameters(self):
"""Access the data obtained from parameters.txt
Returns:
dict with data from parameters.txt
"""
return self.data["parameters.txt"]
[docs] def get_eclfiles(self):
"""
get_eclfiles is deprecated as ecl2df has been renamed to res2df.
Use the function get_resdatafiles together with res2df instead.
"""
if not HAVE_ECL2DF:
logger.warning("ecl2df not installed. Skipping")
return None
data_file_row = self.files[self.files["FILETYPE"] == "DATA"]
data_filename = None
if len(data_file_row) == 1:
data_filename = data_file_row["FULLPATH"].values[0]
elif self._autodiscovery:
data_fileguess = os.path.join(self._origpath, "eclipse/model", "*.DATA")
data_filenamelist = glob.glob(data_fileguess)
if not data_filenamelist:
return None # No filename matches *DATA
if len(data_filenamelist) > 1:
logger.warning(
(
"Multiple DATA files found, "
"consider turning off auto-discovery"
)
)
data_filename = data_filenamelist[0]
self.find_files(data_filename)
else:
# There is no DATA file to be found.
logger.warning("No DATA file found!")
return None
if not os.path.exists(data_filename):
return None
return ecl2df.EclFiles(data_filename)
[docs] def get_resdatafiles(self):
"""
Return an res2df.ResdataFiles object to connect to the res2df package
If autodiscovery, it will search for a DATA file in
the standard location eclipse/model/...DATA.
If you have multiple DATA files, you must discover
the one you need explicitly before calling this function, example:
>>> real = ScratchRealization("myrealpath")
>>> real.find_files("eclipse/model/MYMODELPREDICTION.DATA")
Returns:
res2df.ResdataFiles. None if nothing found
"""
if not HAVE_RES2DF:
logger.warning("res2df not installed. Skipping")
return None
data_file_row = self.files[self.files["FILETYPE"] == "DATA"]
data_filename = None
if len(data_file_row) == 1:
data_filename = data_file_row["FULLPATH"].values[0]
elif self._autodiscovery:
data_fileguess = os.path.join(self._origpath, "eclipse/model", "*.DATA")
data_filenamelist = glob.glob(data_fileguess)
if not data_filenamelist:
return None # No filename matches *DATA
if len(data_filenamelist) > 1:
logger.warning(
(
"Multiple DATA files found, "
"consider turning off auto-discovery"
)
)
data_filename = data_filenamelist[0]
self.find_files(data_filename)
else:
# There is no DATA file to be found.
logger.warning("No DATA file found!")
return None
if not os.path.exists(data_filename):
return None
return res2df.ResdataFiles(data_filename)
[docs] def get_eclsum(self, cache=True, include_restart=True):
"""
Fetch the Eclipse Summary file from the realization
and return as a ResdataFile object
Unless the UNSMRY file has been discovered, it will
pick the file from the glob `eclipse/model/*UNSMRY`,
as long as autodiscovery is not turned off when
the realization object was initialized.
If you have multiple UNSMRY files in eclipse/model
turning off autodiscovery is strongly recommended.
Arguments:
cache: boolean indicating whether we should keep an
object reference to the EclSum object. Set to
false if you need to conserve memory.
include_restart: boolean sent to resdata for whether restart
files should be traversed.
Returns:
EclSum: object representing the summary file. None if
nothing was found.
"""
# Return cached object if available
if cache and self._eclsum and self._eclsum_include_restart == include_restart:
return self._eclsum
unsmry_file_row = self.files[self.files.FILETYPE == "UNSMRY"]
unsmry_filename = None
if len(unsmry_file_row) == 1:
unsmry_filename = unsmry_file_row.FULLPATH.values[0]
elif self._autodiscovery:
unsmry_fileguess = os.path.join(self._origpath, "eclipse/model", "*.UNSMRY")
unsmry_filenamelist = glob.glob(unsmry_fileguess)
if not unsmry_filenamelist:
return None # No filename matches
if len(unsmry_filenamelist) > 1:
logger.warning(
"Multiple UNSMRY files found, consider turning off auto-discovery"
)
unsmry_filename = unsmry_filenamelist[0]
self.find_files(unsmry_filename)
else:
# There is no UNSMRY file to be found.
return None
if not os.path.exists(unsmry_filename):
return None
try:
eclsum = Summary(
unsmry_filename, lazy_load=False, include_restart=include_restart
)
except IOError:
# This can happen if there is something wrong with the file
# or if SMSPEC is missing.
logger.warning("Failed to create summary instance from %s", unsmry_filename)
return None
if cache:
self._eclsum = eclsum
self._eclsum_include_restart = include_restart
return eclsum
[docs] def load_smry(
self,
time_index="raw",
column_keys=None,
cache_eclsum=None,
start_date=None,
end_date=None,
include_restart=True,
):
"""Produce dataframe from Summary data from the realization
When this function is called, the dataframe will be
internalized. Internalization of summary data in a
realization object supports different time_index, but there is
no handling of multiple sets of column_keys. The cached data
will be called
'share/results/tables/unsmry--<time_index>.csv'
where <time_index> is among 'yearly', 'monthly', 'daily', 'first',
'last' or 'raw' (meaning the raw dates in the SMRY file), depending
on the chosen time_index. If a custom time_index (list
of datetime) was supplied, <time_index> will be called 'custom'.
Wraps resdata.summary.Summary.pandas_frame()
See also get_smry()
Args:
time_index: string indicating a resampling frequency,
'yearly', 'monthly', 'daily', 'first', 'last' or 'raw', the
latter will return the simulated report steps (also default).
If a list of DateTime is supplied, data will be resampled
to these.
column_keys: list of column key wildcards. None means everything.
cache_eclsum: boolean for whether to keep the loaded EclSum
object in memory after data has been loaded.
start_date: str or date with first date to include.
Dates prior to this date will be dropped, supplied
start_date will always be included. Overridden if time_index
is 'first' or 'last'.
end_date: str or date with last date to be included.
Dates past this date will be dropped, supplied
end_date will always be included. Overridden if time_index
is 'first' or 'last'.
include_restart: boolean sent to resdata for whether restart
files should be traversed.
Returns:
DataFrame with summary keys as columns and dates as indices.
Empty dataframe if no summary is available or column
keys do not exist.
DataFrame: with summary keys as columns and dates as indices.
Empty dataframe if no summary is available.
"""
if cache_eclsum is not None:
warnings.warn(
(
"cache_eclsum option to load_smry() is deprecated and "
"will be removed in fmu-ensemble v2.0.0"
),
FutureWarning,
)
else:
cache_eclsum = True
if not self.get_eclsum(cache=cache_eclsum):
# Return empty, but do not store the empty dataframe in self.data
return pd.DataFrame()
time_index_path = time_index
if time_index == "raw":
time_index_arg = None
elif isinstance(time_index, str):
# Note: This call will recache the smry object.
time_index_arg = self.get_smry_dates(
freq=time_index,
start_date=start_date,
end_date=end_date,
include_restart=include_restart,
)
elif isinstance(time_index, (list, np.ndarray)):
time_index_arg = time_index
time_index_path = "custom"
elif time_index is None:
time_index_path = "raw"
time_index_arg = time_index
else:
raise TypeError("'time_index' has to be a string, a list or None")
if not isinstance(column_keys, list):
column_keys = [column_keys]
# Do the actual work:
dframe = self.get_eclsum(
cache=cache_eclsum, include_restart=include_restart
).pandas_frame(time_index_arg, column_keys)
dframe = dframe.reset_index()
dframe.rename(columns={"index": "DATE"}, inplace=True)
# Cache the result:
localpath = "share/results/tables/unsmry--" + time_index_path + ".csv"
self.data[localpath] = dframe
# Do this to ensure that we cut the rope to the EclSum object
# Can be critical for garbage collection
if not cache_eclsum:
self._eclsum = None
return dframe
[docs] def get_smry(
self,
time_index=None,
column_keys=None,
cache_eclsum=None,
start_date=None,
end_date=None,
include_restart=True,
):
"""Wrapper for EclSum.pandas_frame
This gives access to the underlying data on disk without
touching internalized dataframes.
Arguments:
time_index: string indicating a resampling frequency,
'yearly', 'monthly', 'daily', 'first', 'last' or 'raw', the
latter will return the simulated report steps (also default).
If a list of DateTime is supplied, data will be resampled
to these. If a date in ISO-8601 format is supplied, that is
used as a single date.
column_keys: list of column key wildcards. None means everything.
cache_eclsum: boolean for whether to keep the loaded EclSum
object in memory after data has been loaded.
start_date: str or date with first date to include.
Dates prior to this date will be dropped, supplied
start_date will always be included. Overridden if time_index
is 'first' or 'last'.
end_date: str or date with last date to be included.
Dates past this date will be dropped, supplied
end_date will always be included. Overridden if time_index
is 'first' or 'last'.
Returns empty dataframe if there is no summary file, or if the
column_keys are not existing.
"""
if cache_eclsum is not None:
warnings.warn(
(
"cache_eclsum option to get_smry() is deprecated and "
"will be removed in fmu-ensemble v2.0.0"
),
FutureWarning,
)
else:
cache_eclsum = True
if not isinstance(column_keys, list):
column_keys = [column_keys]
if isinstance(time_index, str) and time_index == "raw":
time_index_arg = None
elif isinstance(time_index, str):
try:
parseddate = dateutil.parser.isoparse(time_index)
time_index_arg = [parseddate]
except ValueError:
time_index_arg = self.get_smry_dates(
freq=time_index,
start_date=start_date,
end_date=end_date,
include_restart=include_restart,
)
elif time_index is None or isinstance(time_index, (list, np.ndarray)):
time_index_arg = time_index
else:
raise TypeError("'time_index' has to be a string, a list or None")
if self.get_eclsum(cache=cache_eclsum, include_restart=include_restart):
try:
dataframe = self.get_eclsum(
cache=cache_eclsum, include_restart=include_restart
).pandas_frame(time_index_arg, column_keys)
except ValueError:
# We get here if we have requested non-existing column keys
return pd.DataFrame()
if not cache_eclsum:
# Ensure EclSum object can be garbage collected
self._eclsum = None
return dataframe
return pd.DataFrame()
def _glob_smry_keys(self, column_keys):
"""Utility function for globbing column names
Use this to expand 'F*' to the list of Eclipse summary
vectors matching.
Args:
column_keys: str or list of strings with patterns
Returns:
list of strings. Empty list if no summary loaded.
"""
if self.get_eclsum() is None:
logger.warning(
(
"Calling _glob_smry_keys without loaded or found summary file "
"returns empty list"
)
)
return []
if not isinstance(column_keys, list):
column_keys = [column_keys]
keys = set()
for key in column_keys:
if isinstance(key, str):
keys = keys.union(set(self._eclsum.keys(key)))
return list(keys)
[docs] def get_volumetric_rates(self, column_keys=None, time_index=None, time_unit=None):
"""Compute volumetric rates from cumulative summary vectors
See :meth:`fmu.ensemble.util.compute_volumetric_rates`
"""
return compute_volumetric_rates(self, column_keys, time_index, time_unit)
[docs] def get_smryvalues(self, props_wildcard=None):
"""
Fetch selected vectors from Eclipse Summary data.
Args:
props_wildcard : string or list of strings with vector
wildcards
Returns:
a dataframe with values. Raw times from UNSMRY.
Empty dataframe if no summary file data available
"""
warnings.warn(
(
"realization.get_smryvalues() is deprecated and "
"will be removed in fmu-ensemble v2.0.0. Process "
"columns from get_smry() instead."
),
FutureWarning,
)
if not self._eclsum: # check if it is cached
self.get_eclsum()
if not self._eclsum:
return pd.DataFrame()
props = self._glob_smry_keys(props_wildcard)
if "numpy_vector" in dir(self._eclsum):
data = {
prop: self._eclsum.numpy_vector(prop, report_only=False)
for prop in props
}
else: # get_values() is deprecated in resdata
data = {
prop: self._eclsum.get_values(prop, report_only=False) for prop in props
}
dates = self._eclsum.get_dates(report_only=False)
return pd.DataFrame(data=data, index=dates)
[docs] def get_smry_dates(
self,
freq="monthly",
normalize=True,
start_date=None,
end_date=None,
include_restart=True,
):
"""Return list of datetimes available in the realization
Args:
freq: string denoting requested frequency for
the returned list of datetime. 'report' will
yield the sorted union of all valid timesteps for
all realizations. Other valid options are
'daily', 'weekly', 'monthly' and 'yearly'.
'first' will give out the first date (minimum) and
'last' will give out the last date (maximum),
both as lists with one element.
normalize: Whether to normalize backwards at the start
and forwards at the end to ensure the raw
date range is covered.
start_date: str or date with first date to include
Dates prior to this date will be dropped, supplied
start_date will always be included. Overrides
normalized dates. Overridden if freq is 'first'
or 'last'.
end_date: str or date with last date to be included.
Dates past this date will be dropped, supplied
end_date will always be included. Overrides
normalized dates. Overridden if freq is 'first'
or 'last'.
Returns:
list of datetimes. None if no summary data is available.
"""
eclsum = self.get_eclsum(include_restart=include_restart)
if not eclsum:
return None
return unionize_smry_dates(
[eclsum.dates], freq, normalize, start_date, end_date
)
[docs] def contains(self, localpath, **kwargs):
"""Boolean function for asking the realization for presence
of certain data types and possibly data values.
Args:
localpath: string pointing to the data for which the query
applies. If no other arguments, only realizations containing
this data key is kept.
key: A certain key within a realization dictionary that is
required to be present. If a value is also provided, this
key must be equal to this value. If localpath is not
a dictionary, this will raise a ValueError
value: The value a certain key must equal. Floating point
comparisons are not robust. Only relevant for dictionaries
column: Name of a column in tabular data. If columncontains is
not specified, this means that this column must be present
columncontains:
A value that the specific column must include.
Returns:
boolean: True if the data is present and fulfilling any
criteria.
"""
kwargs.pop("inplace", 0)
localpath = shortcut2path(self.keys(), localpath)
if localpath not in self.keys():
return False
if not kwargs:
return localpath in self.keys()
if (
isinstance(self.data[localpath], dict)
and "key" in kwargs
and "value" not in kwargs
):
return kwargs["key"] in self.data[localpath]
if isinstance(self.data[localpath], pd.DataFrame):
if "key" in kwargs:
raise ValueError("Don't use key for tabular data")
if "value" in kwargs:
raise ValueError("Don't use value for tabular data")
if "column" in kwargs and "columncontains" not in kwargs:
# Only asking for column presence
return kwargs["column"] in self.data[localpath].columns
if "column" in kwargs and "columncontains" in kwargs:
# If we are dealing with the DATE column,
# convert everything to pandas datatime64 for comparisons,
# otherwise we revert to simpler check.
if kwargs["column"] == "DATE":
return (
pd.to_datetime(dateutil.parser.parse(kwargs["columncontains"]))
== pd.to_datetime(self.data[localpath][kwargs["column"]])
).any()
return (
kwargs["columncontains"]
in self.data[localpath][kwargs["column"]].values
)
if "key" in kwargs and "value" in kwargs:
if isinstance(kwargs["value"], str):
if kwargs["key"] in self.data[localpath]:
return str(self.data[localpath][kwargs["key"]]) == kwargs["value"]
return False
# non-string, then don't convert the internalized data
return self.data[localpath][kwargs["key"]] == kwargs["value"]
raise ValueError("Wrong arguments to contains()")
[docs] def drop(self, localpath, **kwargs):
"""Delete elements from internalized data.
Shortcuts are allowed for localpath. If the data pointed to is
a DataFrame, you can delete columns, or rows containing certain
elements
If the data pointed to is a dictionary, keys can be deleted.
Args:
localpath: string, path to internalized data. If no other options
are supplied, that dataset is deleted in its entirety
column: string with a column name to drop. Only for dataframes
columns: list of strings with column names to delete
rowcontains: rows where one column contains this string will be
dropped. The comparison is on strings only, and all cells in
the dataframe is converted to strings for the comparison.
Thus it might work on dates, but be careful with numbers.
key: string with a keyname in a dictionary. Will not work for
dataframes
keys: list of strings of keys to delete from a dictionary
"""
fullpath = shortcut2path(self.keys(), localpath)
if fullpath not in self.keys():
raise ValueError("%s not found" % localpath)
data = self.data[fullpath]
if not kwargs:
# This will remove the entire dataset
self.data.pop(fullpath, None)
if isinstance(data, pd.DataFrame):
if "column" in kwargs:
data.drop(labels=kwargs["column"], axis="columns", inplace=True)
if "columns" in kwargs:
data.drop(labels=kwargs["columns"], axis="columns", inplace=True)
if "rowcontains" in kwargs:
# Construct boolean series for those rows that have a match
boolseries = (data.astype(str) == str(kwargs["rowcontains"])).any(
axis="columns"
)
self.data[fullpath] = data[~boolseries]
if isinstance(data, dict):
if "keys" in kwargs:
for key in kwargs["keys"]:
data.pop(key, None)
if "key" in kwargs:
data.pop(kwargs["key"], None)
def __repr__(self):
"""Represent the realization. Show only the last part of the path"""
pathsummary = self._origpath[-50:]
indexstr = str(self.index) if self.index is not None else "Error"
return "<Realization, index={}, path=...{}>".format(indexstr, pathsummary)
def __sub__(self, other):
"""Substract another realization from this"""
result = RealizationCombination(ref=self, sub=other)
return result
def __add__(self, other):
"""Add another realization from this"""
result = RealizationCombination(ref=self, add=other)
return result
def __mul__(self, other):
"""Scale this realization by a scalar value"""
result = RealizationCombination(ref=self, scale=float(other))
return result
def __rsub__(self, other):
"""Add another realization from this"""
result = RealizationCombination(ref=self, sub=other)
return result
def __radd__(self, other):
"""Substract another realization from this"""
result = RealizationCombination(ref=self, add=other)
return result
def __rmul__(self, other):
"""Scale this realization by a scalar value"""
result = RealizationCombination(ref=self, scale=float(other))
return result
[docs] def get_init(self):
"""
:returns: init file of the realization.
"""
warnings.warn(
(
"realization.get_init() is deprecated and "
"will be removed in later versions."
),
FutureWarning,
)
init_file_row = self.files[self.files.FILETYPE == "INIT"]
init_filename = None
if len(init_file_row) == 1:
init_filename = init_file_row.FULLPATH.values[0]
else:
init_fileguess = os.path.join(self._origpath, "eclipse/model", "*.INIT")
init_filenamelist = glob.glob(init_fileguess)
if not init_filenamelist:
return None # No filename matches
init_filename = init_filenamelist[0]
if not os.path.exists(init_filename):
return None
if not self._eclinit:
self._eclinit = ResdataFile(init_filename, flags=FileMode.CLOSE_STREAM)
return self._eclinit
[docs] def get_unrst(self):
"""
:returns: restart file of the realization.
"""
warnings.warn(
(
"realization.get_unrst() is deprecated and "
"will be removed in later versions."
),
FutureWarning,
)
unrst_file_row = self.files[self.files.FILETYPE == "UNRST"]
unrst_filename = None
if len(unrst_file_row) == 1:
unrst_filename = unrst_file_row.FULLPATH.values[0]
else:
unrst_fileguess = os.path.join(self._origpath, "eclipse/model", "*.UNRST")
unrst_filenamelist = glob.glob(unrst_fileguess)
if not unrst_filenamelist:
return None # No filename matches
unrst_filename = unrst_filenamelist[0]
if not os.path.exists(unrst_filename):
return None
if not self._eclunrst:
self._eclunrst = ResdataFile(unrst_filename, flags=FileMode.CLOSE_STREAM)
return self._eclunrst
[docs] def get_grid_index(self, active_only):
"""
Return the grid index in a pandas dataframe.
"""
warnings.warn(
(
"realization.get_grid_index() is deprecated and "
"will be removed in later versions."
),
FutureWarning,
)
if self.get_grid():
return self.get_grid().export_index(active_only=active_only)
logger.warning("No GRID file in realization %s", self)
[docs] def get_grid_corners(self, grid_index):
"""Return a dataframe with the the x, y, z for the
8 grid corners of corner point cells"""
warnings.warn(
(
"realization.get_grid_corners() is deprecated and "
"will be removed in later versions."
),
FutureWarning,
)
if self.get_grid():
corners = self.get_grid().export_corners(grid_index)
columns = [
"x1",
"y1",
"z1",
"x2",
"y2",
"z2",
"x3",
"y3",
"z3",
"x4",
"y4",
"z4",
"x5",
"y5",
"z5",
"x6",
"y6",
"z6",
"x7",
"y7",
"z7",
"x8",
"y8",
"z8",
]
return pd.DataFrame(data=corners, columns=columns)
else:
logger.warning("No GRID file in realization %s", self)
[docs] def get_grid_centre(self, grid_index):
"""Return the grid centre of corner-point-cells, x, y and z
in distinct columns"""
warnings.warn(
(
"realization.get_grid_centre() is deprecated and "
"will be removed in later versions."
),
FutureWarning,
)
if self.get_grid():
grid_cell_centre = self.get_grid().export_position(grid_index)
return pd.DataFrame(
data=grid_cell_centre, columns=["cell_x", "cell_y", "cell_z"]
)
else:
logger.warning("No GRID file in realization %s", self)
[docs] def get_grid(self):
"""
:returns: grid file of the realization.
"""
warnings.warn(
(
"realization.get_grid() is deprecated and "
"will be removed in later versions."
),
FutureWarning,
)
grid_file_row = self.files[self.files.FILETYPE == "EGRID"]
grid_filename = None
if len(grid_file_row) == 1:
grid_filename = grid_file_row.FULLPATH.values[0]
else:
grid_fileguess = os.path.join(self._origpath, "eclipse/model", "*.EGRID")
grid_filenamelist = glob.glob(grid_fileguess)
if not grid_filenamelist:
return None # No filename matches
grid_filename = grid_filenamelist[0]
if not os.path.exists(grid_filename):
return None
if not self._eclgrid:
self._eclgrid = Grid(grid_filename)
return self._eclgrid
@property
def global_size(self):
"""
:returns: Number of cells in the realization.
"""
warnings.warn(
(
"realization.get_grid() is deprecated and "
"will be removed in later versions."
),
FutureWarning,
)
if self.get_grid() is not None:
return self.get_grid().get_global_size()
@property
def actnum(self):
"""
:returns: EclKw of ints showing which cells are active,
Active cells are given value 1, while
inactive cells have value 1.
"""
warnings.warn(
(
"realization.get_grid() is deprecated and "
"will be removed in later versions."
),
FutureWarning,
)
if not self._actnum and self.get_init() is not None:
self._actnum = self.get_init()["PORV"][0].create_actnum()
return self._actnum
@property
def report_dates(self):
"""
:returns: List of DateTime.DateTime for which values are reported.
"""
warnings.warn(
(
"realization.get_grid() is deprecated and "
"will be removed in later versions."
),
FutureWarning,
)
if self.get_unrst() is not None:
return self.get_unrst().report_dates
[docs] def get_global_init_keyword(self, prop):
"""
:param prop: A name of a keyword in the realization's init file.
:returns: The EclKw of given name. Length is global_size.
non-active cells are given value 0.
"""
warnings.warn(
(
"realization.get_global_init_keyword() is deprecated and "
"will be removed in later versions."
),
FutureWarning,
)
if self.get_init() is not None:
return self.get_init()[prop][0].scatter_copy(self.actnum)
[docs] def get_global_unrst_keyword(self, prop, report):
"""
:param prop: A name of a keyword in the realization's restart file.
:returns: The EclKw of given name. Length is global_size.
non-active cells are given value 0.
"""
warnings.warn(
(
"realization.get_global_unrst_keyword() is deprecated and "
"will be removed in later versions."
),
FutureWarning,
)
if self.get_unrst() is not None:
return self.get_unrst()[prop][report].scatter_copy(self.actnum)