"""Nested hybrid grid creation.
Create a merged grid where one region is refined (subdivided) and stitched
back into the original grid via Non-Neighbour Connections (NNCs).
Public API
----------
create_nested_hybrid_grid : function
Build a nested hybrid grid from a coarse grid, a region property,
and a refinement specification.
nnc_to_gridproperty : function
Convert NNC transmissibility DataFrames to GridProperty instances.
nnc_to_flowsimulator_input : function
Write NNC transmissibilities to a flow-simulator input file.
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
import numpy as np
import pandas as pd
import xtgeo
if TYPE_CHECKING:
import os
from collections.abc import Iterable
_logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _crop_for_region(
grid: xtgeo.Grid,
region: xtgeo.GridProperty,
region_id: int,
) -> tuple[xtgeo.Grid, tuple[int, int, int]]:
"""Crop *grid* to the bounding box of *region_id*.
Returns (cropped_grid, crop_origin) where *crop_origin* is the 0-based
(i, j, k) offset of the crop start inside the original grid.
"""
region_values = region.values
region_indices = np.where(region_values == region_id)
imin = int(region_indices[0].min() + 1)
imax = int(region_indices[0].max() + 1)
jmin = int(region_indices[1].min() + 1)
jmax = int(region_indices[1].max() + 1)
kmin = int(region_indices[2].min() + 1)
kmax = int(region_indices[2].max() + 1)
_logger.info(
"Region %d bounding box (1-based): i=%d-%d, j=%d-%d, k=%d-%d",
region_id,
imin,
imax,
jmin,
jmax,
kmin,
kmax,
)
cropped_grid = grid.copy()
cropped_grid.crop((imin, imax), (jmin, jmax), (kmin, kmax), props="all")
_logger.info("Cropped grid dimensions: %s", cropped_grid.dimensions)
return cropped_grid, (imin - 1, jmin - 1, kmin - 1)
def _find_boundary_faces(
region_prop: xtgeo.GridProperty,
target_region: int,
) -> list[tuple[tuple[int, int, int], tuple[int, int, int], str]]:
"""Find cell faces on the boundary between *target_region* and other active regions.
Returns a list of ``(outside_ijk, inside_ijk, face_dir)`` where indices
are 0-based and *face_dir* is one of ``'i+', 'i-', 'j+', 'j-', 'k+', 'k-'``.
"""
filled = np.ma.filled(region_prop.values, fill_value=-1).astype(int)
ni, nj, nk = filled.shape
in_target = filled == target_region
outside_active = (filled != target_region) & (filled != -1)
faces: list[tuple[tuple[int, int, int], tuple[int, int, int], str]] = []
# i+
mask = in_target[: ni - 1, :, :] & outside_active[1:ni, :, :]
for i, j, k in np.argwhere(mask):
faces.append(((int(i + 1), int(j), int(k)), (int(i), int(j), int(k)), "i+"))
# i-
mask = in_target[1:ni, :, :] & outside_active[: ni - 1, :, :]
for idx, j, k in np.argwhere(mask):
faces.append(((int(idx), int(j), int(k)), (int(idx + 1), int(j), int(k)), "i-"))
# j+
mask = in_target[:, : nj - 1, :] & outside_active[:, 1:nj, :]
for i, j, k in np.argwhere(mask):
faces.append(((int(i), int(j + 1), int(k)), (int(i), int(j), int(k)), "j+"))
# j-
mask = in_target[:, 1:nj, :] & outside_active[:, : nj - 1, :]
for i, jdx, k in np.argwhere(mask):
faces.append(((int(i), int(jdx), int(k)), (int(i), int(jdx + 1), int(k)), "j-"))
# k+
mask = in_target[:, :, : nk - 1] & outside_active[:, :, 1:nk]
for i, j, k in np.argwhere(mask):
faces.append(((int(i), int(j), int(k + 1)), (int(i), int(j), int(k)), "k+"))
# k-
mask = in_target[:, :, 1:nk] & outside_active[:, :, : nk - 1]
for i, j, kdx in np.argwhere(mask):
faces.append(((int(i), int(j), int(kdx)), (int(i), int(j), int(kdx + 1)), "k-"))
_logger.info("Found %d boundary faces for region %d", len(faces), target_region)
return faces
def _compute_nnc_table(
region_prop: xtgeo.GridProperty,
target_region_id: int,
crop_origin: tuple[int, int, int],
refinement: tuple[int, int, int],
coarse_ncol: int,
) -> pd.DataFrame:
"""Compute NNC cell-pair mapping between mother and refined cells.
For each boundary face between the target region and the surrounding mother
cells, this function determines which refined sub-cells in the merged grid
connect to which mother cell, and through which face direction.
The mapping is purely topological (index-based) — no geometric computation
is performed here. The resulting table is intended to be passed to
:meth:`xtgeo.Grid.get_transmissibilities` so that it can compute the
actual transmissibility for each cell pair.
Convention:
- ``I1, J1, K1`` is always the **mother** cell (1-based, merged grid).
- ``I2, J2, K2`` is always the **refined** cell (1-based, merged grid).
- ``DIRECTION`` is from the mother cell's perspective (e.g. ``"I+"``
means looking in the positive I-direction from the mother cell
you reach the refined cell).
Args:
region_prop: Region property on the original (unmodified) grid.
target_region_id: Region value that was refined.
crop_origin: 0-based ``(i0, j0, k0)`` origin of the crop box.
refinement: ``(rcol, rrow, rlay)`` refinement factors.
coarse_ncol: Number of columns in the coarse grid (grid1 in the merge).
Returns:
A DataFrame with columns ``I1, J1, K1, I2, J2, K2, DIRECTION``.
"""
faces = _find_boundary_faces(region_prop, target_region_id)
rcol, rrow, rlay = refinement
i0, j0, k0 = crop_origin
# In the merged grid, grid2 (refined) starts after a 1-column gap:
i_offset = coarse_ncol + 1
rows: list[dict[str, int | str]] = []
for outside_ijk, inside_ijk, face_dir in faces:
# outside_ijk = mother cell (0-based in original/merged grid)
mi, mj, mk = outside_ijk
# inside_ijk = target cell (0-based in original grid) → cropped coords
ci = inside_ijk[0] - i0
cj = inside_ijk[1] - j0
ck = inside_ijk[2] - k0
# Determine direction from mother and which refined cells lie on the face.
# face_dir is from the *inside* (target) cell's perspective;
# the mother's perspective is the opposite sign.
#
# For I-faces: the varying refined indices are J and K (rrow × rlay cells)
# For J-faces: the varying refined indices are I and K (rcol × rlay cells)
# For K-faces: the varying refined indices are I and J (rcol × rrow cells)
ref_is: Iterable[int]
ref_js: Iterable[int]
ref_ks: Iterable[int]
if face_dir == "i-":
# Target at higher I than mother → mother's I+ face
direction = "I+"
ref_is = [ci * rcol] # first i-column of refined block (I- face)
ref_js = range(cj * rrow, cj * rrow + rrow)
ref_ks = range(ck * rlay, ck * rlay + rlay)
elif face_dir == "i+":
# Target at lower I than mother → mother's I- face
direction = "I-"
ref_is = [ci * rcol + rcol - 1] # last i-column (I+ face)
ref_js = range(cj * rrow, cj * rrow + rrow)
ref_ks = range(ck * rlay, ck * rlay + rlay)
elif face_dir == "j-":
# Target at higher J than mother → mother's J+ face
direction = "J+"
ref_is = range(ci * rcol, ci * rcol + rcol)
ref_js = [cj * rrow] # first j-row (J- face)
ref_ks = range(ck * rlay, ck * rlay + rlay)
elif face_dir == "j+":
# Target at lower J than mother → mother's J- face
direction = "J-"
ref_is = range(ci * rcol, ci * rcol + rcol)
ref_js = [cj * rrow + rrow - 1] # last j-row (J+ face)
ref_ks = range(ck * rlay, ck * rlay + rlay)
elif face_dir == "k-":
# Target at higher K than mother → mother's K+ face
direction = "K+"
ref_is = range(ci * rcol, ci * rcol + rcol)
ref_js = range(cj * rrow, cj * rrow + rrow)
ref_ks = [ck * rlay] # first k-layer (K- face)
elif face_dir == "k+":
# Target at lower K than mother → mother's K- face
direction = "K-"
ref_is = range(ci * rcol, ci * rcol + rcol)
ref_js = range(cj * rrow, cj * rrow + rrow)
ref_ks = [ck * rlay + rlay - 1] # last k-layer (K+ face)
else:
raise ValueError(f"Unexpected face direction: {face_dir!r}")
for ri in ref_is:
for rj in ref_js:
for rk in ref_ks:
rows.append(
{
"I1": mi + 1,
"J1": mj + 1,
"K1": mk + 1,
"I2": ri + i_offset + 1,
"J2": rj + 1,
"K2": rk + 1,
"DIRECTION": direction,
}
)
_logger.info(
"NNC table: %d cell pairs from %d boundary faces", len(rows), len(faces)
)
return pd.DataFrame(rows, columns=["I1", "J1", "K1", "I2", "J2", "K2", "DIRECTION"])
def _set_actnum_by_region(
grid: xtgeo.Grid,
region_prop: xtgeo.GridProperty,
target_region: int,
*,
invert: bool = False,
) -> None:
"""Deactivate cells based on region membership."""
actnum = grid.get_actnum()
region_values = region_prop.values
mask = region_values != target_region if invert else region_values == target_region
_logger.info(
"Deactivating %d cells (region %s %d)",
np.sum(mask),
"!=" if invert else "==",
target_region,
)
actnum.values[mask] = 0
grid.set_actnum(actnum)
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
[docs]
def create_nested_hybrid_grid(
grid: xtgeo.Grid,
region: xtgeo.GridProperty,
target_region_id: int,
refinement: tuple[int, int, int],
) -> tuple[xtgeo.Grid, pd.DataFrame]:
"""Create a nested hybrid grid by refining one region and merging it back.
The cells belonging to *target_region_id* are replaced by a refined
(subdivided) version of the same region. A ``NEST_ID`` discrete property
is attached to the merged grid, encoding the nested hybrid structure:
- ``NEST_ID == 1``: coarse (mother) grid cells.
- ``NEST_ID == 2``: refined grid cells.
In addition, a **NNC mapping table** is returned that lists every
mother ↔ refined cell pair that should be connected by a Non-Neighbour
Connection (NNC). The table is derived from the topological knowledge
available at merge time (which original cell was refined and how its
sub-cells map into the merged grid).
The table columns are:
- ``I1, J1, K1``: mother cell indices (1-based) in the merged grid.
- ``I2, J2, K2``: refined cell indices (1-based) in the merged grid.
- ``DIRECTION``: face direction from the mother cell's perspective
(``I+``, ``I-``, ``J+``, ``J-``, ``K+``, ``K-``).
This table can be passed to
:meth:`xtgeo.Grid.get_transmissibilities` to compute NNC
transmissibilities for the specified cell pairs.
Args:
grid: The original coarse grid.
region: A :class:`xtgeo.GridProperty` whose values identify the
regions (e.g. an integer region parameter).
target_region_id: The region value to refine.
refinement: ``(ncol, nrow, nlay)`` refinement factors.
Returns:
A tuple ``(merged_grid, nnc_table)`` where *merged_grid* is a new
:class:`xtgeo.Grid` with the refined region stitched back into the
coarse grid and *nnc_table* is a :class:`pandas.DataFrame` mapping
mother cells to their connected refined cells.
"""
if any(r < 1 for r in refinement):
raise ValueError(f"Refinement factors must be >= 1, got {refinement}")
if region.dimensions != grid.dimensions:
raise ValueError(
f"Region property dimensions {region.dimensions} do not match "
f"grid dimensions {grid.dimensions}"
)
# Make working copies so the caller's objects are not mutated.
grid = grid.copy()
region = region.copy()
# Attach the region property to the grid.
grid.append_prop(region)
# 1. Crop to the bounding box of the target region.
cropped, crop_origin = _crop_for_region(grid, region, target_region_id)
# 2. Refine the cropped grid.
refined = cropped.copy()
rcol, rrow, rlay = refinement
refined.refine(refine_col=rcol, refine_row=rrow, refine_layer=rlay)
_logger.info("Refined cropped grid dimensions: %s", refined.dimensions)
# 3. Compute the NNC mapping table *before* deactivation mutates anything.
# This uses the original region property to find boundary faces and
# maps them through the crop → refine → merge index chain.
nnc_table = _compute_nnc_table(
region_prop=region,
target_region_id=target_region_id,
crop_origin=crop_origin,
refinement=refinement,
coarse_ncol=grid.ncol,
)
# 4. Deactivate the target region in the coarse grid (will be replaced).
coarse_region = grid.get_prop_by_name(region.name)
_set_actnum_by_region(grid, coarse_region, target_region_id, invert=False)
# 5. In the refined grid keep only target-region cells active.
refined_region = refined.get_prop_by_name(region.name)
_set_actnum_by_region(refined, refined_region, target_region_id, invert=True)
# 6. Create NEST_ID properties before merging (1=mother, 2=refined).
nest_id_coarse = xtgeo.GridProperty(
grid,
name="NEST_ID",
discrete=True,
values=np.where(grid.get_actnum().values == 1, 1, 0).astype(np.int32),
codes={0: "inactive", 1: "mother", 2: "refined"},
)
grid.append_prop(nest_id_coarse)
nest_id_refined = xtgeo.GridProperty(
refined,
name="NEST_ID",
discrete=True,
values=np.where(refined.get_actnum().values == 1, 2, 0).astype(np.int32),
codes={0: "inactive", 1: "mother", 2: "refined"},
)
refined.append_prop(nest_id_refined)
# 7. Merge the two grids.
merged = xtgeo.grid_merge(grid, refined)
_logger.info("Merged grid dimensions: %s", merged.dimensions)
return merged, nnc_table
[docs]
def nnc_to_gridproperty(
grid: xtgeo.Grid,
nnc_df: pd.DataFrame,
) -> tuple[xtgeo.GridProperty, xtgeo.GridProperty, xtgeo.GridProperty]:
"""Convert NNC transmissibility data to three GridProperty instances.
Takes the NNC DataFrame produced by :meth:`xtgeo.Grid.get_transmissibilities`
and maps transmissibility values onto grid cells, producing one property per
direction (I, J, K).
For rows where DIRECTION contains ``"+"``, the transmissibility value is
placed in cell ``(I1, J1, K1)``. For rows where DIRECTION contains
``"-"``, the value is placed in cell ``(I2, J2, K2)``. Index columns
(I1, J1, K1, I2, J2, K2) are expected to be **1-based**.
If multiple rows map to the same cell and direction, the transmissibility
values are summed (parallel flow paths are additive).
Args:
grid: The xtgeo Grid that defines the geometry.
nnc_df: A DataFrame with at least columns
``I1, J1, K1, I2, J2, K2, T, DIRECTION``.
Returns:
A tuple ``(tranx_nnc, trany_nnc, tranz_nnc)`` of
:class:`xtgeo.GridProperty` instances named ``"TRANX_NNC"``,
``"TRANY_NNC"``, and ``"TRANZ_NNC"`` respectively.
Cells without an NNC value are set to ``-1.0``.
"""
required_cols = {"I1", "J1", "K1", "I2", "J2", "K2", "T", "DIRECTION"}
missing = required_cols - set(nnc_df.columns)
if missing:
raise ValueError(f"Missing required columns in nnc_df: {missing}")
ncol, nrow, nlay = grid.ncol, grid.nrow, grid.nlay
fill = -1.0
arrays = {
"I": np.zeros((ncol, nrow, nlay), dtype=np.float64),
"J": np.zeros((ncol, nrow, nlay), dtype=np.float64),
"K": np.zeros((ncol, nrow, nlay), dtype=np.float64),
}
touched = {
"I": np.zeros((ncol, nrow, nlay), dtype=bool),
"J": np.zeros((ncol, nrow, nlay), dtype=bool),
"K": np.zeros((ncol, nrow, nlay), dtype=bool),
}
direction_col = nnc_df["DIRECTION"].astype(str)
is_plus = direction_col.str.contains(r"\+", regex=True)
is_minus = direction_col.str.contains("-")
prefix_col = direction_col.str[0].str.upper()
for prefix in ("I", "J", "K"):
arr = arrays[prefix]
tch = touched[prefix]
mask_prefix = prefix_col == prefix
# "+" rows → use (I1, J1, K1)
sel_plus = nnc_df.loc[mask_prefix & is_plus]
if not sel_plus.empty:
ii = sel_plus["I1"].values.astype(int) - 1
jj = sel_plus["J1"].values.astype(int) - 1
kk = sel_plus["K1"].values.astype(int) - 1
tt = sel_plus["T"].values.astype(float)
valid = (
(ii >= 0)
& (ii < ncol)
& (jj >= 0)
& (jj < nrow)
& (kk >= 0)
& (kk < nlay)
)
np.add.at(arr, (ii[valid], jj[valid], kk[valid]), tt[valid])
tch[ii[valid], jj[valid], kk[valid]] = True
# "-" rows → use (I2, J2, K2)
sel_minus = nnc_df.loc[mask_prefix & is_minus]
if not sel_minus.empty:
ii = sel_minus["I2"].values.astype(int) - 1
jj = sel_minus["J2"].values.astype(int) - 1
kk = sel_minus["K2"].values.astype(int) - 1
tt = sel_minus["T"].values.astype(float)
valid = (
(ii >= 0)
& (ii < ncol)
& (jj >= 0)
& (jj < nrow)
& (kk >= 0)
& (kk < nlay)
)
np.add.at(arr, (ii[valid], jj[valid], kk[valid]), tt[valid])
tch[ii[valid], jj[valid], kk[valid]] = True
# Set untouched cells to fill value
arr[~tch] = fill
prop_names = {"I": "TRANX_NNC", "J": "TRANY_NNC", "K": "TRANZ_NNC"}
props = {}
for prefix in ("I", "J", "K"):
props[prefix] = xtgeo.GridProperty(
grid,
name=prop_names[prefix],
values=np.ma.array(arrays[prefix]),
discrete=False,
)
return props["I"], props["J"], props["K"]