Source code for pyscal.utils.relperm

"""Utility functions for computations on relative permeability curves"""

import logging
from typing import Optional

import numpy as np
import pandas as pd

from ..constants import EPSILON as epsilon
from ..constants import SWINTEGERS

logger = logging.getLogger(__name__)


[docs] def truncate_zeroness( value: float, zeronesslimit: float = 1 / SWINTEGERS, name: str = "", log=True ) -> float: """Check a value for closeness to zero, and return as zero if below a given limit, if not return the value""" if value < zeronesslimit: if log and name and not np.isclose(value, 0.0): logger.warning(f"{name}={value} was close to zero, truncated to 0") return 0.0 return value
[docs] def crosspoint(dframe: pd.DataFrame, satcol: str, kr1col: str, kr2col: str) -> float: """Locate the saturation value (crosspoint) where kr1col == kr2col Args: dframe: Dataframe with at least three columns satcol: Column name for the saturation column kr1col: Column name for first relperm column kr2col: Column name for second column Returns: The saturation value (interpolated) where kr1col == kr2col, when krXcol is linearly interpolated as a function of the saturation values. In case of errors, the function will return the value -1 """ if len(dframe) < 2: return -1 cross_dframe = pd.DataFrame(dframe[[satcol, kr1col, kr2col]]) if cross_dframe.isna().any().any(): logger.error("nan in input to crosspoint()") logger.debug(str(cross_dframe)) return -1 cross_dframe.loc[:, "krdiff"] = cross_dframe[kr1col] - cross_dframe[kr2col] # Add a zero value for the difference column, and interpolate # the saturation column to the zero value zerodf = pd.DataFrame(index=[len(cross_dframe)], data={"krdiff": 0.0}) cross_dframe = pd.concat([cross_dframe, zerodf], sort=True).set_index("krdiff") cross_dframe = cross_dframe[~cross_dframe.index.duplicated(keep="first")] if cross_dframe.shape[0] > 2: cross_dframe = cross_dframe.interpolate(method="slinear") if cross_dframe.isna().any().any(): logger.error("Could not compute crosspoint)") logger.debug(str(cross_dframe)) return -1 return cross_dframe[np.isclose(cross_dframe.index, 0.0)][satcol].values[0]
[docs] def estimate_diffjumppoint( table: pd.DataFrame, xcol: Optional[str] = None, ycol: Optional[str] = None, side: str = "right", ) -> float: """Estimate the point where the y-data jumps from being linear in x to being nonlinear, or where it shift from one linear domain to another (for a piecewise linear function) If xcol is sw, and ycol is krw, and side is 'right', this will typically estimate sorw for you. If side is 'left' it will give you swcr. Args: table: A Dataframe with x and y data xcol: The name of the column in table containing x-data. If None (default) the first column in table will be used. ycol: The name of the column in table containing y-data. If None (default) the second column in the table will be used. side: Must be 'left' or 'right'. Decides whether to look from the right side of the x-interval or from the left side for the linear domain. Returns: The x value where the start-linear domain ends. """ if not xcol: xcol = table.columns[0] if not ycol: ycol = table.columns[1] assert isinstance(ycol, str) assert isinstance(xcol, str) if not side: raise ValueError("side cannot be None, use left or right") side = side.lower() assert side in ["left", "right"] # Compute the derivative: table["_deriv"] = table[ycol].diff() / table[xcol].diff() # The first becomes NaN, extrapolate from the second row: table.loc[0, "_deriv"] = table["_deriv"].iloc[1] # Pick the derivative at the first or last segment: iloc = {"left": 0, "right": -1} lin_a = table["_deriv"].iloc[iloc[side]] # Make a linear extrapolation from the last segment, starting at max x table["_linear"] = (table[xcol] - table[xcol].iloc[iloc[side]]) * lin_a + table[ ycol ].iloc[iloc[side]] assert table["_linear"].values[iloc[side]] == table[ycol].values[iloc[side]] # Compute how much krw deviates from the linear krw: table["_lindev"] = (table[ycol] - table["_linear"]).abs() # Use the cumulative sum to determine the onset of non-zero deviation # starting from sw=1: table["_lindevcumsum"] = table["_lindev"].cumsum() if side == "right": maxcumsum = table["_lindevcumsum"].max() linearpart = table[(table["_lindevcumsum"] - maxcumsum).abs() < epsilon] return linearpart.iloc[1][xcol] linearpart = table[(table["_lindevcumsum"] < epsilon)] if len(linearpart) == 1: linearpart = table[(table["_lindevcumsum"].shift(1) < epsilon)] return linearpart.iloc[-1][xcol]