"""Sw J or BVW or Brooks-Corey general functions, by direct or integration.
Solve a system on form:
sw = a(m + x*h)^b
The "x" can e.g. be the J term in the Leverett equation, but without the height term.
In many cases m is zero.
The public library `xtgeo` must be available.
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from typing import Any
import numpy as np
import xtgeo
# custom logger for this module
logger = logging.getLogger(__name__)
class SwFunction:
"""Generic Sw calc for formulation Sw = a * (m + x * h)^b.
Some theory for this is shown here: :download:`pdf <pdf/sw_calc.pdf>`.
grid: The xtgeo grid object
x: The ``x`` term in the generic saturation. For a simplified J function, this
will be sqrt(perm/poro).
a: The ``a`` term in the generic saturation.
b: The ``b`` term in the generic saturation.
ffl: The free fluid level, either as a number or a 3D xtgeo gridproperty
m: The ``m`` term in the equation above, either as a number or a 3D xtgeo
gridproperty object. Defaults to zero.
swira: The absolute irreducable satuation applied as asymptote in the equation.
Defaults to zero.
swmax: The maximum applied as asymptote in the equation.
Defaults to 1.
method: How to look at cell geometry. There are three options: 'cell_center',
'cell_center-above_ffl' and 'cell_corners_above_ffl'.
gridname: Optional - only needed when debug keyword is True.
project: Optional. Must be included when working in RMS.
invert: Optional, will in case invert ``a`` and ``b`` in case the J function
(or similar) is formulated as Sw = (J/A)^(1/B). This is the case in e.g.
zdepth: Optional. May speed up computation if provided, in case the function
is called several time for the same grid.
hcenter: Optional. May speed up computation if provided, in case the function
is called several time for the same grid, or hcenter is pre-computed in a
special way. The hcenter will work with direct method only, not integration.
htop: Optional. May speed up computation if provided, in case the function
is called several time for the same grid.
hbot: Optional. May speed up computation if provided, in case the function
is called several time for the same grid.
debug: If True, several "check parameters" will we given, either as grid
properties (if working in RMS) or as ROFF files (if working outside RMS).
tag: Optional string to identify debug parameters.
grid: xtgeo.Grid
# all input are constants or xtgeo grid or gridproperty
x: float | xtgeo.GridProperty = 0.0
a: float | xtgeo.GridProperty = 1.0
b: float | xtgeo.GridProperty = -1.0
ffl: float | xtgeo.GridProperty = 999.0
m: float | xtgeo.GridProperty = 0.0
swira: float | xtgeo.GridProperty = 0.0
swmax: float | xtgeo.GridProperty = 1.0
method: str = "cell_center_above_ffl"
gridname: str = "" # only used when debugging in RMS
project: Any | None = None
invert: bool = False # if the SwJ function is on "reverse" form
# if None they will be computed here; otherwise they can be given explicitly:
hcenter: xtgeo.GridProperty = None
htop: xtgeo.GridProperty = None
hbot: xtgeo.GridProperty = None
# debug flag for additional parameters. Given in RMS if project is given; otherwise
# as files on your working folder
debug: bool = False
tag: str = "" # identification tag to add to debug params
# derived and internal
_swval: np.ma.MaskedArray = field(init=False)
_sw: Any = field(init=False)
def __post_init__(self) -> None:
"""Post init."""
if self.tag and not self.tag.startswith("_"):
self.tag = "_" + self.tag
if all([self.hcenter, self.htop, self.hbot]):
logger.info("Essential geometries are pre-computed")
if self.debug:
if self.project:
self.grid.to_roxar(self.project, "DEBUG_" + self.gridname)
def _process_input(self) -> None:
"""Work with a, b, x, ie. inversing and convert from float."""
logger.info("Process a, b, etc")
if self.method not in ALLOWED_METHODS:
raise ValueError(
f"The method <{self.method}> is invalid. Allowed: {ALLOWED_METHODS}"
if isinstance(self.a, (int, float)):
self.a = xtgeo.GridProperty(self.grid, values=self.a)
if isinstance(self.b, (int, float)):
self.b = xtgeo.GridProperty(self.grid, values=self.b)
if isinstance(self.x, (int, float)):
self.x = xtgeo.GridProperty(self.grid, values=self.x)
if isinstance(self.m, (int, float)):
self.m = xtgeo.GridProperty(self.grid, values=self.m)
if isinstance(self.ffl, (int, float)):
self.ffl = xtgeo.GridProperty(self.grid, values=self.ffl)
if isinstance(self.swira, (int, float)):
self.swira = xtgeo.GridProperty(self.grid, values=self.swira)
if isinstance(self.swmax, (int, float)):
self.swmax = xtgeo.GridProperty(self.grid, values=self.swmax)
if self.invert:
orig_a = self.a.values.mean()
orig_b = self.b.values.mean()
self.b.values = 1.0 / self.b.values
self.a.values = np.ma.power(1.0 / self.a.values, self.b.values)
new_a = self.a.values.mean()
new_b = self.b.values.mean()
"Inverted a and b; original average was %s %s, new is %s %s",
def _compute_htop_hbot(self) -> None:
"""Setting geometries for 'bottom' and 'top' cell.
All these values are relative to the given contact, as "height above". For
cells under the contact, zero or a negative number will be applied.
assert isinstance(self.ffl, xtgeo.GridProperty) # mypy
htop, hbot, hmid = self.grid.get_heights_above_ffl(self.ffl, option=self.method)
self.htop = htop
self.hbot = hbot
self.hcenter = hmid
logger.info("Use method %s", self.method)
if self.debug:
htop.name = "TOP_SW" + self.tag
hbot.name = "BOT_SW" + self.tag
hmid.name = "CENTER_SW" + self.tag
if self.project:
logger.debug("TAG is ", self.tag)
htop.to_roxar(self.project, self.gridname, "HTOP" + self.tag)
hbot.to_roxar(self.project, self.gridname, "HBOT" + self.tag)
hmid.to_roxar(self.project, self.gridname, "HCENTER" + self.tag)
def _sw_function_direct(self) -> None:
"""Use function on form Sw = A*(M + X*h)^B; generic function!"""
assert isinstance(self.a, xtgeo.GridProperty) # mypy
assert isinstance(self.b, xtgeo.GridProperty) # mypy
assert isinstance(self.x, xtgeo.GridProperty) # mypy
assert isinstance(self.m, xtgeo.GridProperty) # mypy
assert isinstance(self.swira, xtgeo.GridProperty) # mypy
assert isinstance(self.swmax, xtgeo.GridProperty) # mypy
# the direct function is mostly used to compare with integrated approach, as QC
height = self.hcenter.values
xh_val = self.x.values * height
self._swval = self.a.values * np.ma.power(self.m.values + xh_val, self.b.values)
# normalize before any limitations
self._swval = (
self.swira.values + (self.swmax.values - self.swira.values) * self._swval
self._swval = np.ma.where(height <= 0.0, 1, self._swval)
self._swval = np.ma.where(self._swval > 1.0, 1.0, self._swval)
# store result
self._sw = xtgeo.GridProperty(self.grid, values=self._swval)
def _sw_function_integrate_w_mterm(self) -> None:
"""Integrate a function on form Sw = A*(M + JX*H)^B; generic function!
It assumes here:
* A: self.a
* B: self.b
* X: self.x
* M: self.m
* self.ffl is the free fluid level for the phase under investigation
assert isinstance(self.a, xtgeo.GridProperty) # mypy
assert isinstance(self.b, xtgeo.GridProperty) # mypy
assert isinstance(self.x, xtgeo.GridProperty) # mypy
assert isinstance(self.m, xtgeo.GridProperty) # mypy
assert isinstance(self.swira, xtgeo.GridProperty) # mypy
assert isinstance(self.swmax, xtgeo.GridProperty) # mypy
ht = (
((1.0 / self.a.values) ** (1.0 / self.b.values)) - self.m.values
) / self.x.values # threshold height
h2 = self.htop.values.copy() # h_top or H2 in integration
h1 = self.hbot.values.copy() # h_bot or H1 in integration
if self.debug:
tmp = xtgeo.GridProperty(self.grid, values=ht, name="HT" + self.tag)
if self.project:
tmp.to_roxar(self.project, self.gridname, "THRESHOLD HEIGHT" + self.tag)
water = h2 * 0.0
water = np.ma.where(h2 < ht, 1.0, water)
water = np.ma.where(h2 <= 0.0, 1.0, water) # may occur for negative ht
# get all possible corner cases when a cell is close to free fluid level
# some of these are probably hitting twice or more; the point is to integrate
# over correct interval
h1x = np.ma.where(h1 <= 0.0, 0.0, h1)
h2x = np.ma.where(h2 <= h1x, h1x, h2)
htx = ht - h1x
htx = np.ma.where(htx <= 0.0, 0.0, htx)
htx = np.ma.where(h2x <= 0.0, 0.0, htx)
hxx = np.ma.where(ht > h1x, ht, h1x)
dh = h2x - h1x
dh = np.ma.where(dh < 0.001, 0.001, dh) # avoid zero division
self._swval = (
+ self.a.values
/ (self.x.values * (self.b.values + 1))
* (
np.ma.power(self.m.values + self.x.values * h2x, self.b.values + 1)
- np.ma.power(self.m.values + self.x.values * hxx, self.b.values + 1)
) / dh
# mark collapsed cells, where direct calulation is needed
mark = (h2x - h1x) * 0.0
mark = np.ma.where(dh <= 0.001, 1.0, mark)
jval = self.x.values * h2x
coll_swval = self.a.values * np.ma.power(self.m.values + jval, self.b.values)
self._swval = np.ma.where(mark > 0.5, coll_swval, self._swval)
# normalize before any limitations
self._swval = (
self.swira.values + (self.swmax.values - self.swira.values) * self._swval
# limit
self._swval = np.ma.where(water > 0.5, 1.0, self._swval)
self._swval = np.ma.where(self._swval > 1.0, 1.0, self._swval)
self._swval = np.ma.where(self._swval < 0.00099, 0.00099, self._swval)
self._sw = xtgeo.GridProperty(self.grid, values=self._swval)
def _compute_integrated(self) -> None:
"""Compute Sw, integrated version."""
logger.info("Integration, do integration over height of cell...")
if self._sw.values.max() > 1.0:
raise RuntimeError(f"SW max out of range: {self._sw.values.max()}")
if self._sw.values.min() < 0.0:
raise RuntimeError(f"SW min out of range: {self._sw.values.min()}")
def _compute_direct(self) -> None:
"""Compute Sw, direct version (less precise wiht thick cells)."""
logger.info("Direct, no integration over height...")
if self._sw.values.max() > 1.0:
raise RuntimeError(f"SW max out of range: {self._sw.values.max()}")
if self._sw.values.min() < 0.0:
raise RuntimeError(f"SW min out of range: {self._sw.values.min()}")
def compute(self, compute_method: str = "integrated") -> xtgeo.GridProperty:
"""Common compute function for saturation, and returns the Sw property"""
if compute_method == "integrated":
return self._sw