Source code for fmu.tools.ensembles.ensemble_well_props

"""Compute well properties/stats along a (planned) well path based on ensemble data.

The implementation is through a console script, which reads a YAML config file.

Output will be RMS well file and/or CSV files, in addition to screen info.
"""

import argparse
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional, Sequence, no_type_check

import pandas as pd
import xtgeo
import yaml

GCELLNAMES = ["ICELL", "JCELL", "KCELL"]


[docs]@dataclass class ScreenInfo: quiet: bool = False moreverbose: bool = False
[docs] def oprint(self, *args): """Ordinary print info for users.""" if not self.quiet: print(">>", *args)
[docs] def cprint(self, *args): """Ordinary but clean print withou leading >>, useful for e.g. dataframes.""" if not self.quiet: print("\n", *args)
[docs] def xprint(self, *args): """Extra print if more verbose output is asked for.""" if not self.quiet and self.moreverbose: print(">x", *args)
[docs]def get_parser_args(args): """Set up parser for command line end point.""" if args is None: args = sys.argv[1:] parser = argparse.ArgumentParser( description="Compute ensemble properties along a given well.", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser.add_argument("--config", "-c", type=str, help="Config file in YAML format.") parser.add_argument( "--example", action="store_true", help="Dump example config file as template" ) parser.add_argument( "--verbose", "-v", action="store_true", help="Be more verbose to screen" ) parser.add_argument( "--quiet", "-q", action="store_true", help="Silence all screen messages" ) myargs = parser.parse_args(args) if len(args) < 1: parser.print_help() sys.exit(0) return myargs
[docs]def dump_example_config(): """Dump an example config file as example.yml.""" example = """ ensemble: # root to FMU run/case root: /scratch/fmu/nn/01_drogon_ahm realizations: # spesify as range (both ends incuded) OR as entries range: 0-30 # entries: [0, 4, 5, 7, 13] # full name of iteration folder, e.g. 'iter-2' or 'pred' iteration: iter-0 well: # path (full or relative) to well (which must be in RMS ascii well format) file: /some/path/to/mywell.w # lognames to import (optional, to speed up, default is "all" which can be slow) # a recommendation is to ONLY include the required MD log lognames: ["PHIT", "MD"] # name of MDepth log in file (a measured depth log is required) mdlog: MD # delta = sampling rate, default is 2 (if metric units: means 2 meters) # A large delta will speed up run but decrease accuracy delta: 1 # which MD ranges to evaluate, list in list where inner is [min, max] mdranges: [[1690, 1710], [1730, 1830]] gridproperties: grid: # filestub is path after 'root / "realization-N" / iteration' filestub: share/results/grids/geogrid.roff # if grid is without structural uncertainty => reuse: True reuse: false # list of properties to analyse with codes (discrete, e.g. facies) or # range interval; the final report will combine all these criteria! # The filestub is a relative path the the ensemble run properties: - name: Facies filestub: share/results/grids/geogrid--facies.roff discrete: true - name: PHIT filestub: share/results/grids/geogrid--poro.roff - name: PERM filestub: share/results/grids/geogrid--perm.roff report: # produce a RMS ascii well file with average logs (can be imported to RMS) average_logs: fileroot: some # report cumulative length statistics on CSV format, note that multiple criteria # is possible and the result will be a "AND" combination cumulative_lengths: # output will be one file for reals, and one file for statistics e.g. # cumlen.csv and cumlen_summary.csv fileroot: cumlen criteria: Facies: codes: [1, 3, 4] # several numbers possible PHIT: interval: [0.22, 0.4] # only two numbers PERM: interval: [1000, 100000] # only two numbers # the fields below gives an opportunity to study more details of the calculations keep_intermediate_logs: false show_in_terminal: true """ example = example.split("\n")[1:] with open("example.yml", "w", encoding="utf-8") as stream: for line in example: line = line.replace(" " * 4, "", 1) stream.write(f"{line}\n") print("See example.yml, and use this as template for your case!")
[docs]def parse_config(configfile): """Parse YAML file.""" try: with open(configfile, "r", encoding="utf-8") as stream: config = yaml.safe_load(stream) except IOError: print(f"Could not read file: {configfile}") raise return config
[docs]@dataclass class PropsData: """Holding property data and criteria""" name: str filestub: str discrete: bool = False codes: Optional[list] = None interval: Optional[list] = None
[docs]@dataclass class ConfigData: """Other class data than config are evaluated from config in post_init.""" config: dict root: Optional[Path] = None reals: Optional[list] = None itera: Optional[str] = None wellfile: Optional[Path] = None welldelta: int = 2 gridfile: Optional[Path] = None gridreuse: bool = False proplist: Optional[List[PropsData]] = None report_avg_file: Optional[str] = None report_cum_file: Optional[str] = None report_keep_intermediate: bool = False report_show_in_terminal: bool = True def __post_init__(self): """Derive input data from config.""" self.root = Path(self.config["ensemble"]["root"]) if "range" in self.config["ensemble"]["realizations"]: start, stop = self.config["ensemble"]["realizations"]["range"].split("-") self.reals = list(range(int(start), int(stop) + 1)) elif "entries" in self.config["ensemble"]["realizations"]: self.reals = self.config["ensemble"]["realizations"]["entries"] else: raise ValueError("For 'realizations': 'range' or 'entries' is missing") self.itera = self.config["ensemble"]["iteration"] self.wellfile = self.config["well"]["file"] self.lognames = self.config["well"].get("lognames", "all") self.mdlog = self.config["well"].get("mdlog", "MDepth") self.mdranges = self._validate_mdranges(self.config["well"]["mdranges"]) self.welldelta = self.config["well"].get("delta", 2) cfgprop = self.config["gridproperties"] self.gridfilestub = Path(cfgprop["grid"]["filestub"]) self.gridreuse = cfgprop["grid"].get("reuse", False) self.proplist = [] props = cfgprop["properties"] for prop in props: name = prop["name"] filestub = prop["filestub"] discrete = prop.get("discrete", False) propcase = PropsData(name, filestub, discrete) self.proplist.append(propcase) rpt = self.config["report"] if "average_logs" in rpt: self.report_avg_file = rpt["average_logs"].get("fileroot", None) if self.report_avg_file: self.report_avg_file += ".rmswell" if "cumulative_lengths" in rpt: self.report_cum_file = rpt["cumulative_lengths"].get("fileroot", None) crit = rpt["cumulative_lengths"].get("criteria", None) if crit is None: raise ValueError("Criteria is missing for 'cumulative_length' option") for propcase in self.proplist: if crit and propcase.name in crit: codes = crit[propcase.name].get("codes", None) interval = crit[propcase.name].get("interval", None) if not codes and propcase.discrete: raise ValueError( f"Discrete property {propcase.name} must have " "'codes' as criteria" ) if not interval and not propcase.discrete: raise ValueError( f"Continuous property {propcase.name} must have " "'interval' as criteria" ) propcase.codes = self._validate_codes(codes) propcase.interval = self._validate_interval(interval) self.report_keep_intermediate = rpt.get("keep_intermediate_logs", False) self.report_show_in_terminal = rpt.get("show_in_terminal", True) @staticmethod def _validate_mdranges(mdranges: List[Sequence[float]]) -> List[Sequence[float]]: """Check that mdranges are on proper form and that intervals do not overlap""" if not mdranges: raise ValueError("Mandatory input 'mdranges' is not set!") previous_stop = -999999.0 for mdsubset in mdranges: start, stop = mdsubset if not isinstance(start, (int, float)) or not isinstance( stop, (int, float) ): raise ValueError("Values in mdranges must be numbers") if stop <= start: raise ValueError( f"Stop value in mdranges ranges is less than start: {mdsubset}" ) if start < previous_stop: raise ValueError(f"Ranges cannot be overlapping: {mdranges}") previous_stop = stop return mdranges @staticmethod def _validate_codes(codes): """Check that codes input looks sane.""" if codes is None: return None if isinstance(codes, list): for code in codes: if not isinstance(code, int): raise ValueError(f"Code is not integer: {code} in {codes}") else: raise ValueError("The 'codes' input is not a list") return codes @staticmethod def _validate_interval(interval): """Check that interval input looks sane.""" if interval is None: return None if isinstance(interval, list): if len(interval) != 2: raise ValueError( f"Interval list input must be 2 items exactly: {interval}" ) imin, imax = interval if not isinstance(imin, (int, float)): raise ValueError( f"First number in interval (minimum) is not a number: {interval}" ) if not isinstance(imax, (int, float)): raise ValueError( f"Second number in interval (maximum) is not a number: {interval}" ) if imin >= imax: raise ValueError(f"Minimum cannot be >= than maximum: {interval}") else: raise ValueError("The 'interval' is not a list") return interval
[docs]@dataclass class WellCase: well: xtgeo.Well mdlog: str mdranges: list delta: int = 2 def __post_init__(self): """Various post init operations.""" self.well.rescale(delta=self.delta) dflist = [] for rnge in self.mdranges: dfrorig = self.well.dataframe.copy() rmin, rmax = rnge dfr = dfrorig[(dfrorig[self.mdlog] >= rmin) & (dfrorig[self.mdlog] <= rmax)] dfr = dfr.reset_index(drop=True) dflist.append(dfr) self.well.dataframe = pd.concat(dflist, ignore_index=True)
[docs]@dataclass class EnsembleWellProps: """The ensemble well props is use for post loop analysis.""" well: xtgeo.Well realizations: list cfg: ConfigData sinfo: ScreenInfo added_logs: Optional[dict] = None # added log per prop name; normally tmp added_flag_logs: Optional[list] = None cumlenreport: Optional[pd.DataFrame] = None cumlenreport_summary: Optional[pd.DataFrame] = None def __post_init__(self): allprops = {} for prop in self.cfg.proplist: plist = [] for real in self.realizations: plist.append(f"{prop.name}_r{real}") allprops[prop.name] = plist self.added_logs = allprops
[docs] @no_type_check def process_ensemble_avglogs(self) -> bool: """Make ensemble mean or mode (most-of) logs.""" if not self.cfg.report_avg_file: return False dfr = self.well.dataframe for prop in self.cfg.proplist: pplist = self.added_logs[prop.name] if not prop.discrete: self.well.create_log( f"{prop.name}_mean", logtype="CONT", ) dfr[f"{prop.name}_mean"] = dfr[pplist].mean(axis=1) else: self.well.create_log( f"{prop.name}_mode", logtype="DISC", logrecord=self.well.get_logrecord(pplist[0]), ) dfr[f"{prop.name}_mode"] = dfr[pplist].mode(axis=1)[0] # most of return True
[docs] @no_type_check def process_ensemble_cumlen(self) -> bool: """Provide cumulative length statistics.""" if not self.cfg.report_cum_file: return False dfr = self.well.dataframe mdstart = dfr[self.cfg.mdlog].values[0] mdend = dfr[self.cfg.mdlog].values[-1] totlensegments = 0 nsegments = len(self.cfg.mdranges) for intv in self.cfg.mdranges: lmin, lmax = intv if lmax > mdend: lmax = mdend totlensegments += lmax - lmin myreport = [] added_flag_logs = [] for real in self.realizations: cname = f"_COMB_r{real}" dfr[cname] = 1 added_flag_logs.append(cname) for prop in self.cfg.proplist: idn = f"_r{real}" pidn = prop.name + idn pidx = "q" + pidn if prop.codes: dfr[pidx] = 0 for code in prop.codes: dfr.loc[dfr[pidn] == code, pidx] = 1 added_flag_logs.append(pidx) dfr.loc[dfr[pidx] != 1, cname] = 0 if prop.interval: dfr[pidx] = 0 vmin, vmax = prop.interval dfr.loc[(dfr[pidn] >= vmin) & (dfr[pidn] <= vmax), pidx] = 1 added_flag_logs.append(pidx) dfr.loc[dfr[pidx] != 1, cname] = 0 fractions = self.well.dataframe[cname].dropna().value_counts(normalize=True) cfrac = fractions.get(1, 0.0) clen = totlensegments * cfrac rline = [ real, self.well.name, nsegments, cfrac, clen, totlensegments, mdend - mdstart, mdstart, mdend, ] myreport.append(rline) self.added_flag_logs = added_flag_logs dfr = self.cumlenreport = pd.DataFrame( myreport, columns=[ "REAL", "WELLNAME", "NSEGMENTS", "GOODFRACTION", "GOODCUMLENGTH", "TOTALLENGTH_ACTIVE", "TOTALLENGTH_ALL", "MDSTART", "MDEND", ], ) newdfr = dfr[["GOODFRACTION", "GOODCUMLENGTH"]] self.cumlenreport_summary = newdfr.describe(percentiles=[0.1, 0.5, 0.9]) return True
[docs] def optionally_delete_logs(self): keep = self.cfg.report_keep_intermediate if not keep and self.added_logs: wll = self.well for _, val in self.added_logs.items(): wll.delete_logs(val)
[docs] def optionally_delete_flag_logs(self): keep = self.cfg.report_keep_intermediate if not keep and self.added_flag_logs: wll = self.well wll.delete_logs(self.added_flag_logs)
[docs]@no_type_check def loop_for_compute( config: dict, sinfo: ScreenInfo, _dryrun: bool = False ) -> EnsembleWellProps: """Collect for computing the ensemble statistics. Args: config: The input configuration dictonary sinfo: Messages to screen instance _dryrun: For testing, skipping computation """ cfg = ConfigData(config) wcase = WellCase( xtgeo.well_from_file(cfg.wellfile, lognames=cfg.lognames), cfg.mdlog, cfg.mdranges, cfg.welldelta, ) grd = None sinfo.oprint("Loop data over realizations...") used_realizations = [] for real in cfg.reals: sinfo.oprint(f"Realization no. {real}") realiterpath = cfg.root / f"realization-{real}" / cfg.itera if not isinstance(grd, xtgeo.Grid) or not cfg.gridreuse: # one may choose to reuse grid if not structural uncertainty sinfo.oprint(f"Read grid geometry for realization {real}") gpath = realiterpath / cfg.gridfilestub try: grd = xtgeo.grid_from_file(gpath) except OSError: sinfo.oprint(f"Not able to read grid {gpath}, skip realization...") continue wcase.well.delete_logs(GCELLNAMES) wcase.well.make_ijk_from_grid(grd) for propcase in cfg.proplist: proppath = realiterpath / propcase.filestub sinfo.oprint(f"Read: {proppath}...") try: theprop = xtgeo.gridproperty_from_file(proppath) except OSError: sinfo.oprint( f"Not able to read property {propcase.name} from {proppath}, " "skip realization..." ) continue theprop.geometry = grd if _dryrun is False: run_compute(real, wcase.well, propcase, theprop) used_realizations.append(real) sinfo.xprint("Delete logs referring to cells...") wcase.well.delete_logs(GCELLNAMES) return EnsembleWellProps(wcase.well, used_realizations, cfg, sinfo)
[docs]def run_compute(real, well, prop, theprop): """Compute well props sampled from gridproperties for one propert and realization. The well.dataframe is updated in-place with new column(s) """ idn = f"_r{real}" theprop.name = prop.name well.get_gridproperties(theprop, grid=tuple(GCELLNAMES), prop_id=idn)
[docs]@no_type_check def process_ensemble(ens: EnsembleWellProps): """Process the ensemble (seen in well) according to client 'report' requirements. Args: ens: Instance of EnsembleWellProps which holds all relevant data """ avgstatus = ens.process_ensemble_avglogs() cumstatus = ens.process_ensemble_cumlen() sinfo = ens.sinfo sinfo.oprint("Process ensemble data for well...") sinfo.xprint(f"The avgstatus is {avgstatus} and cumstatus is {cumstatus}") if avgstatus: ens.optionally_delete_logs() ens.optionally_delete_flag_logs() if ens.cfg.report_show_in_terminal: sinfo.cprint(f"Well data:\n{ens.well.dataframe.to_string()}") sinfo.oprint(f"Save to RMS well data file: {ens.cfg.report_avg_file}") ens.well.to_file(ens.cfg.report_avg_file) if cumstatus: ens.optionally_delete_logs() ens.optionally_delete_flag_logs() if ens.cfg.report_show_in_terminal: sinfo.cprint(f"\nPer realization:\n{ens.cumlenreport}") sinfo.cprint(f"\nSummary:\n{ens.cumlenreport_summary}") ens.cumlenreport.to_csv(ens.cfg.report_cum_file + ".csv") ens.cumlenreport_summary.to_csv(ens.cfg.report_cum_file + "_summary.csv") sinfo.oprint( f"Wrote summary to {ens.cfg.report_cum_file}.csv and " f"{ens.cfg.report_cum_file}_summary.csv" )
[docs]def main(args=None): """Main routine for ensemble_well_props. Args: args: Input config; if missing the command line is parsed for YAML file. """ if isinstance(args, dict): screeninfo = ScreenInfo() config = args else: args = get_parser_args(args) if args.example: dump_example_config() return if args.config: screeninfo = ScreenInfo(args.quiet, args.verbose) config = parse_config(args.config) ens = loop_for_compute(config, screeninfo) process_ensemble(ens)
if __name__ == "__main__": main()