Source code for fmu.tools.fipmapper.fipmapper

"""The FipMapper class, mapping region/zones in RMS to FIPxxx in Eclipse."""

import collections
import contextlib
import itertools
import logging
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union

import pandas as pd
import yaml
from disjoint_set import DisjointSet

logger = logging.getLogger(__name__)


[docs]class FipMapper: def __init__( self, *, yamlfile: Optional[Union[str, Path]] = None, mapdata: Optional[Dict[str, str]] = None, skipstring: Optional[Union[List[str], str]] = None, ): """Represent mappings between region/zones to FIPxxx. FipMapper is a class to represent a to map between regions/zones in the geomodel (RMS) and to different region partitions in the dynamic model (Eclipse). Primary usage is to determine which RMS regions corresponds to which FIPNUMs, similarly for zones, and in both directions. Configuration is via a yaml-file or directly with a dictionary. The map can be configured in any direction. Several data structures in the dictionary can be used, such that the needed information can be extracted from the global configurations file. Assumptions: * A FIPNUM always maps to an arbitrary-length list of regions * A FIPNUM always maps to an arbitrary-length list of zones * A region always maps to an arbitrary-length list of FIPNUMs * A zone always maps to an arbitrary-length list of FIPNUMs * A region is assumed to be present for all zones, but not relevant in this class * A zone is assumed to be present for all regions, but not relevant in this class For FIPNUM, the datatype must be integers, but can be initialized from strings as long as they can be parsed as strings. For Region and Zone, a string datatype is assumed, but the yaml input is allowed to be integers or integers and strings mixed. Some functions will return these as integers if they were inputted as such, but at least the disjoint_sets() function will always return these as strings. Args: yamlfile: Filename mapdata: direct dictionary input. Provide only one of the arguments, not both. skipstring: List of strings which will be ignored (e.g. ["Totals"]). """ self._mapdata: Dict[str, dict] = {} # To be filled with data we need. if skipstring is None: self.skipstring = [] if isinstance(skipstring, str): self.skipstring = [skipstring] if yamlfile is not None and mapdata is not None: raise ValueError( "Initialize with either yamlfile or explicit data, not both" ) if yamlfile is None and mapdata is None: logger.warning("FipMapper initialized with no data") if yamlfile is not None: logger.info("Loading data from %s", yamlfile) with open(yamlfile, encoding="utf-8") as stream: yamldata = yaml.safe_load(stream) logger.debug(str(yamldata)) else: yamldata = mapdata if yamldata is not None: self._get_explicit_mapdata(yamldata) if yamldata is not None and "global" in yamldata: # This is a fmu-config file. self._fipdata_from_fmuconfigyaml(yamldata) # Webviz YML format: if yamldata is not None and "FIPNUM" in yamldata: self._fipdata_from_webvizyaml(yamldata) assert isinstance(self._mapdata, dict), "FipMapper needs a dictionary" # Determine our capabilities: self.has_fip2region = "fipnum2region" in self._mapdata self.has_fip2zone = "fipnum2zone" in self._mapdata self.has_region2fip = "region2fipnum" in self._mapdata self.has_zone2fip = "zone2fipnum" in self._mapdata # Validate that all FIPNUMs are integers: try: [int(fip) for fip in self.get_fipnums()] except AssertionError: # This is for partially empty fipmappers. pass except ValueError: raise TypeError(f"All FIPNUMs must be integers, got {self.get_fipnums}") def _get_explicit_mapdata(self, yamldata: Dict[str, Any]): """Fetch explicit mapping configuration from a dictionary. Set internal flags when maps are found Invert maps when possible/needed Args: yamldata (dict): Configuration object with predefined items at the first level. """ if self._mapdata is None: self._mapdata = {} if "fipnum2region" in yamldata: self._mapdata["fipnum2region"] = yamldata["fipnum2region"] if "region2fipnum" not in yamldata: self._mapdata["region2fipnum"] = invert_map( self._mapdata["fipnum2region"], skipstring=self.skipstring ) self.has_fip2region = True self.has_region2fip = True if "region2fipnum" in yamldata: self._mapdata["region2fipnum"] = yamldata["region2fipnum"] if "fipnum2region" not in yamldata: logger.debug(self._mapdata["region2fipnum"]) self._mapdata["fipnum2region"] = invert_map( self._mapdata["region2fipnum"], skipstring=self.skipstring ) self.has_fip2region = True self.has_region2fip = True if "fipnum2zone" in yamldata: self._mapdata["fipnum2zone"] = yamldata["fipnum2zone"] if "zone2fipnum" not in yamldata: self._mapdata["zone2fipnum"] = invert_map( self._mapdata["fipnum2zone"], skipstring=self.skipstring ) self.has_fip2zone = True self.has_zone2fip = True if "zone2fipnum" in yamldata: self._mapdata["zone2fipnum"] = yamldata["zone2fipnum"] if "fip2zone" not in yamldata: self._mapdata["fipnum2zone"] = invert_map( self._mapdata["zone2fipnum"], skipstring=self.skipstring ) self.has_fip2zone = True self.has_zone2fip = True def _fipdata_from_fmuconfigyaml(self, yamldict: dict): """This function should be able to build mapping from region/zones to FIPNUM based on data it finds in a fmu-config global_master_config.yml file. How that map should be deduced is not yet defined, and we only support having the explicit maps "region2fipnum" etc under the global section Args: yamldict (dict): """ self._get_explicit_mapdata(yamldict["global"]) def _fipdata_from_webvizyaml(self, yamldict: dict): """This function loads the Webviz yaml syntax for REGION/ZONE to FIPNUM mapping, The syntax is defined in https://github.com/equinor/webviz-subsurface/blob/master/webviz_subsurface/plugins/_reservoir_simulation_timeseries_regional.py#L1422 Args: yamldict (dict): """ self._get_explicit_mapdata(webviz_to_prtvol2csv(yamldict)) def _fips2regions(self, fips: List[int]) -> List[List[str]]: return [self.fip2region(fip_int) for fip_int in fips]
[docs] def get_regions(self) -> List[str]: """Obtain a sorted list of the regions that exist in the map""" assert "region2fipnum" in self._mapdata, "No data provided for regions" try: return sorted(self._mapdata["region2fipnum"].keys()) except TypeError: # We get here if some regions are ints and others are strings return sorted(map(str, self._mapdata["region2fipnum"].keys()))
[docs] def get_zones(self) -> List[str]: """Obtain a sorted list of the zones that exist in the map""" assert "zone2fipnum" in self._mapdata, "No data provided for regions" try: return sorted(self._mapdata["zone2fipnum"].keys()) except TypeError: # We get here if some zones are ints and others are strings return sorted(map(str, self._mapdata["zone2fipnum"].keys()))
[docs] def get_fipnums(self) -> List[str]: """Obtain a sorted list of the fip numbers that exist in the map""" assert "fipnum2region" in self._mapdata, "No data provided for regions" return sorted(self._mapdata["fipnum2region"].keys())
[docs] def fip2region(self, fip: int) -> List[str]: """Maps one FIP(NUM) integer to list of Region strings. Each FIPNUM can map to multiple regions, therefore a list is always returned for each FIPNUM. Args: array: List/array of FIPNUMS, or integer. Returns: List of strings or list of lists of strings, depending on input. Region names that are "integers" will be returned as strings. Empty list if no region is known for a specific FIPNUM. """ assert "fipnum2region" in self._mapdata, "No data provided for fip2region" try: regions = self._mapdata["fipnum2region"][fip] if not isinstance(regions, list): # Single region in input return [regions] return regions except KeyError: logger.warning( "Unknown fip %s, known map is %s", str(fip), str(self._mapdata["fipnum2region"]), ) return []
def _regions2fips(self, regions: List[str]) -> List[List[int]]: return [self.region2fip(region) for region in regions]
[docs] def region2fip(self, region: Union[int, str]) -> List[int]: """Maps a Region string/int to FIPNUM(s). Args: region: Region Returns: FIPNUM values. None if the region is unknown, many if many FIPNUMs are present in the region. """ assert "region2fipnum" in self._mapdata, "No data provided for region2fip" if region not in self._mapdata["region2fipnum"]: with contextlib.suppress(ValueError): # If regions have mixed types in yaml, we are sometimes # asked for a region as a stringified integer region = int(region) try: fips = self._mapdata["region2fipnum"][region] if not isinstance(fips, list): # Single FIPNUM in input return [int(fips)] return [int(fip) for fip in fips] except KeyError: logger.warning( "Unknown region %s, known map is %s", str(region), str(self._mapdata["region2fipnum"]), ) return []
[docs] def zone2fip(self, zone: Union[str, int]) -> List[int]: """Maps a zone to FIPNUMs""" assert "zone2fipnum" in self._mapdata, "No data provided for zone2fip" if zone not in self._mapdata["zone2fipnum"]: with contextlib.suppress(ValueError): # If zones have mixed types in yaml, we are sometimes # asked for a zone as a stringified integer zone = int(zone) try: fips = self._mapdata["zone2fipnum"][zone] if not isinstance(fips, list): # Single FIPNUM in input return [int(fips)] return [int(fip) for fip in fips] except KeyError: logger.warning( "Unknown zone %s, known map is %s", str(zone), str(self._mapdata["zone2fipnum"]), ) return []
def _fips2zones(self, fips: List[int]) -> List[List[str]]: return [self.fip2zone(fip) for fip in fips]
[docs] def fip2zone(self, fip: int) -> List[str]: """Maps a FIPNUM integer to an list of Zone strings Args: array (list): List/array of FIPNUMS, or integer. Returns: list: Region strings. Always returned as list, and always as strings, even if zone "names" are integers. Empty list if no zone is assigned to the FIPNUM. """ assert "fipnum2zone" in self._mapdata, "No data provided for fip2zone" try: zones = self._mapdata["fipnum2zone"][fip] if not isinstance(zones, list): # Single zone for this FIPNUM return [zones] return zones except KeyError: logger.warning("The zone belonging to FIPNUM %s is unknown", str(fip)) return [] # type: ignore
[docs] def regzone2fip(self, region: str, zone: str) -> List[int]: fipreg = self.region2fip(region) fipzon = self.zone2fip(zone) return sorted(set(fipreg).intersection(set(fipzon)))
[docs] def disjoint_sets(self) -> pd.DataFrame: """Determine the minimal disjoint sets of a reservoir The disjoint sets returned consist of sets that can be split into both a set of FIPxxxx list and a region/zone list. Thus, the sum of any additive property is comparable on these disjoint sets. The returned object is a dataframe that is to be used to group together fipnums or regions/zones so they are summable. Note that the REGION and ZONE columns always contain strings only, while FIPNUM is always an integer. Each row represents a cell in the partition where both region, zone and fipnum boundaries apply, this the finest possible partition the fipmapper data allows. Each row is then assigned to a integer identifier in the ``SET`` column. The chosen integers values for each set is based on lexiographical sorting of regions, zones and fipnum values. These sets signifies the minimal grouping of data that must be applied in order for volumes in the region/zone partition or fipnum partition to be comparable. """ # Generate all possible combinations of the regions and # zones we know of: regzone_df = pd.DataFrame( columns=["REGION", "ZONE"], data=itertools.product(self.get_regions(), self.get_zones()), ) # Map all of the region-zone combinations into the accompanying FIPNUMs: regzone_df["FIPNUMS"] = regzone_df.apply( lambda x: self.regzone2fip(x["REGION"], x["ZONE"]), axis=1 ) # The dataframe has lists in the FIPNUMS column when a reg/zone maps # to multiple FIPNUMs. Unroll these into one row pr linked FIPNUM: dframe = _expand_regzone_df(regzone_df) # The `dframe` now has one row pr. smallest "cell" that is interesting # in the current context. In some sense, the "intersection" of all # possible partitions. # Create a dataframe of all possible combinations of these smallest cells: edges = pd.merge( dframe.assign(dummy=1), dframe.assign(dummy=1), on="dummy" ).drop("dummy", axis=1) # When Pandas 1.2 is ubiqutous, replace the above statement with: # edges = dframe.merge(dframe, how="cross") # A partition is equivalent to an equivalence relation. # Apply an equivalence relation to the cell combinations: edges["NEIGHBOURS"] = edges.apply( lambda x: _equivalent_cells( x["REGION_x"], x["ZONE_x"], x["FIPNUM_x"], x["REGION_y"], x["ZONE_y"], x["FIPNUM_y"], ), axis=1, ) # Filter to only edges that determine which cell linkages # that should be grouped: neighbourlist = edges[edges["NEIGHBOURS"]] # Construct a disjoint set object of all the smallest cells that are # to be grouped/unionized: ds: DisjointSet = DisjointSet() for _, row in dframe.iterrows(): ds.find((row["REGION"], row["ZONE"], row["FIPNUM"])) # Apply the union-find algorithm to determine the partition # where all equivalene relations are obeyed: for _, pair in neighbourlist.iterrows(): ds.union( (pair["REGION_x"], pair["ZONE_x"], pair["FIPNUM_x"]), (pair["REGION_y"], pair["ZONE_y"], pair["FIPNUM_y"]), ) # The union-find algorithm has now "named" each of the components # in the disjoint set by a somewhat random mother/root node. This root # not is not any more a root compared to the other cells in the set, # so each set is instead mapped to consecutive integers. id_dict: dict = collections.defaultdict(lambda: len(id_dict)) dframe["SET"] = [ id_dict[root] for root in dframe.sort_values(["REGION", "ZONE", "FIPNUM"]).apply( lambda x: ds.find((x["REGION"], x["ZONE"], x["FIPNUM"])), axis=1 ) ] dframe["REGION"] = dframe["REGION"].astype(str) dframe["ZONE"] = dframe["ZONE"].astype(str) dframe["FIPNUM"] = dframe["FIPNUM"].astype(int) return dframe
def _equivalent_cells( reg1: Any, zon1: Any, fip1: Any, reg2: Any, zon2: Any, fip2: Any ) -> bool: """Define the equivalence relation for the reg-zone-fip reservoir partition A pair of reg-zone-fip is in the same group if they must be treated together when (and have properties summed). Say if you have a value for a specific region, but this region contains two FIPNUMs. Then we can never treat these two FIPNUMs separately, they must be summed in order to be comparable to the value for the region """ return ((reg1 == reg2) and (zon1 == zon2)) or (fip1 == fip2)
[docs]def regions_in_set(dframe: pd.DataFrame) -> Dict[int, List[str]]: """From the dataframe returned by disjoint_sets(), compute a dictionary to map from a set index to a list of regions that are members of that set index Args: dframe: The dataframe emitted by disjoint_sets() """ if dframe.empty: return {} return ( dframe.groupby("SET")["REGION"].apply(set).apply(list).apply(sorted).to_dict() )
[docs]def zones_in_set(dframe: pd.DataFrame) -> Dict[int, List[str]]: """From the dataframe returned by disjoint_sets(), compute a dictionary to map from a set index to a list of zones that are members of that set index Args: dframe: The dataframe emitted by disjoint_sets() """ if dframe.empty: return {} return dframe.groupby("SET")["ZONE"].apply(set).apply(list).apply(sorted).to_dict()
[docs]def fipnums_in_set(dframe: pd.DataFrame) -> Dict[int, List[int]]: """From the dataframe returned by disjoint_sets(), compute a dictionary to map from a set index to a list of FIPNUM values that are members of that set index Args: dframe: The dataframe emitted by disjoint_sets() """ if dframe.empty: return {} return ( dframe.groupby("SET")["FIPNUM"].apply(set).apply(list).apply(sorted).to_dict() )
[docs]def regzonefips_in_set(dframe: pd.DataFrame) -> Dict[int, List[Tuple[str, str, int]]]: """From the dataframe returned by disjoint_sets(), compute a dictionary to map from a set index to a list of tuples of the region, zones and fipnums in the set. Args: dframe: The dataframe emitted by disjoint_sets() """ if dframe.empty: return {} dframe = dframe.copy() dframe["reg-zone-fip"] = dframe[["REGION", "ZONE", "FIPNUM"]].apply(tuple, axis=1) return ( dframe.groupby("SET")["reg-zone-fip"] .apply(set) .apply(list) .apply(sorted) .to_dict() )
[docs]def webviz_to_prtvol2csv(webvizdict: dict): """Convert a dict representation of a region/zone map in the Webviz format to the prtvol2csv format""" if "FIPNUM" in webvizdict and isinstance(webvizdict["FIPNUM"], dict): prtvoldict = {} if "groups" in webvizdict["FIPNUM"]: if "REGION" in webvizdict["FIPNUM"]["groups"]: prtvoldict["region2fipnum"] = webvizdict["FIPNUM"]["groups"]["REGION"] if "ZONE" in webvizdict["FIPNUM"]["groups"]: prtvoldict["zone2fipnum"] = webvizdict["FIPNUM"]["groups"]["ZONE"] else: # The "groups" level might go away: if "REGION" in webvizdict["FIPNUM"]: prtvoldict["region2fipnum"] = webvizdict["FIPNUM"]["REGION"] if "ZONE" in webvizdict["FIPNUM"]: prtvoldict["zone2fipnum"] = webvizdict["FIPNUM"]["ZONE"] return prtvoldict return {}
[docs]def invert_map( dictmap: Dict[str, Any], skipstring: Optional[Union[list, str]] = None ) -> Dict[str, List[Any]]: """Invert a dictionary, supporting many-to-many maps. Args: dictmap skipstring: List of strings which will be ignored (e.g. "Totals"). """ if skipstring is None: skipstring = [] if isinstance(skipstring, str): skipstring = [skipstring] inv_map: Dict[str, List[Any]] = {} for key, value in dictmap.items(): if key in skipstring or value in skipstring: continue if isinstance(value, list): for _value in value: inv_map[_value] = list(set(inv_map.get(_value, set())).union({key})) else: base = set(inv_map.get(value, set())) # mypy workaround: https://github.com/python/mypy/issues/2013 inv_map[value] = list(base.union({key})) for key, value in inv_map.items(): try: inv_map[key] = sorted(inv_map[key]) except TypeError: # Datatype of keys are mixed, typically int and str. inv_map[key] = sorted(map(str, list(inv_map[key]))) return inv_map
def _expand_regzone_df(dframe: pd.DataFrame, fipname: str = "FIPNUM") -> pd.DataFrame: """Unroll dataframe rows with a FIPNUM list in the "FIPNUMS" column""" new_rows = [] for _, row in dframe.iterrows(): for fipnumber in row[fipname + "S"]: new_rows.append( { "REGION": row["REGION"], "ZONE": row["ZONE"], fipname: fipnumber, "REGZONE": str(row["REGION"]) + "-" + str(row["ZONE"]), } ) return pd.DataFrame(new_rows)