Source code for fmu.tools.extract_grid_zone_tops_etc
"""Extract grid zone tops from wells."""
import pathlib
from typing import Any, Dict, Optional, Union
import numpy as np
import pandas as pd
import xtgeo
[docs]def extract_grid_zone_tops(
project: Optional[Any] = None,
well_list: Optional[list] = None,
logrun: str = "log",
trajectory: str = "Drilled trajectory",
gridzonelog: Optional[str] = None,
mdlogname: Optional[str] = None,
grid: Optional[str] = None,
zone_param: Optional[str] = None,
alias_file: Optional[str] = None,
rms_name: str = "RMS_WELL_NAME",
ecl_name: str = "ECLIPSE_WELL_NAME",
) -> pd.DataFrame:
"""
Function for extracting top and base from gridzones, both in TVD and MD.
A pandas dataframe will be returned.
Users can either input a pre-generated gridzonelog or a grid and a zone parameter
for computing the gridzonelog.
The function works both inside RMS and outside with file input. If input from files,
and a MD log is not present in the well a quasi md log will be computed and used.
"""
use_gridzonelog = gridzonelog is not None
if not use_gridzonelog:
if grid is not None and zone_param is not None:
if project is not None:
mygrid = xtgeo.grid_from_roxar(project, grid)
gridzones = xtgeo.gridproperty_from_roxar(project, grid, zone_param)
else:
mygrid = xtgeo.grid_from_file(grid)
gridzones = xtgeo.gridproperty_from_file(zone_param, grid=mygrid)
gridzones.name = "Zone"
else:
raise ValueError("Specify either 'gridzonelog' or 'grid' and 'zone_param")
dfs = []
if well_list is None:
well_list = []
for well in well_list:
try:
if project is not None:
xtg_well = xtgeo.well_from_roxar(
project,
str(well),
trajectory=trajectory,
logrun=logrun,
inclmd=True,
)
else:
xtg_well = xtgeo.well_from_file(str(well), mdlogname=mdlogname)
# quasi md log will be computed
xtg_well.geometrics()
except (ValueError, KeyError):
continue
# if no gridzonelog create one from the zone parameter
if not use_gridzonelog:
xtg_well.get_gridproperties(gridzones, mygrid)
gridzonelog = "Zone_model"
if xtg_well.dataframe[gridzonelog].isnull().values.all():
continue
# Set gridzonelog as zonelog and extract zonation tops from it
xtg_well.zonelogname = gridzonelog
dframe = xtg_well.get_zonation_points(top_prefix="", use_undef=True)
dframe.rename(
columns={
"Z_TVDSS": "TOP_TVD",
xtg_well.mdlogname: "TOP_MD",
"Zone": "ZONE_CODE",
"WellName": "WELL",
},
inplace=True,
)
# find deepest point in well while in grid
df_max = (
xtg_well.dataframe[["Z_TVDSS", xtg_well.mdlogname, gridzonelog]]
.dropna()
.sort_values(by=xtg_well.mdlogname)
)
# create base picks also
dframe["BASE_TVD"] = dframe["TOP_TVD"].shift(-1)
dframe["BASE_MD"] = dframe["TOP_MD"].shift(-1)
dframe.at[dframe.index[-1], "BASE_TVD"] = df_max.iloc[-1]["Z_TVDSS"]
dframe.at[dframe.index[-1], "BASE_MD"] = df_max.iloc[-1][xtg_well.mdlogname]
# adjust zone values to get correct zone information
dframe["ZONE_CODE"] = shift_zone_values(dframe["ZONE_CODE"].values.copy())
dframe["ZONE"] = (
dframe["ZONE_CODE"]
.map(xtg_well.get_logrecord(xtg_well.zonelogname))
.fillna("Outside")
)
dfs.append(dframe.drop(columns=["TopName", "Q_INCL", "Q_AZI"], errors="ignore"))
df = pd.concat(dfs)
if alias_file is not None:
well_dict = make_alias_dict(alias_file, rms_name, ecl_name)
df["WELL"] = df["WELL"].replace(well_dict)
return df
[docs]def shift_zone_values(zvals: np.ndarray) -> np.ndarray:
for idx, _zval in enumerate(zvals):
if idx == len(zvals) - 1:
continue
if zvals[idx] == zvals[idx + 1]:
zvals[idx + 1] = zvals[idx + 1] - 1
return zvals
[docs]def make_alias_dict(
alias_file: Union[str, pathlib.Path],
rms_name: str = "RMS_WELL_NAME",
ecl_name: str = "ECLIPSE_WELL_NAME",
) -> Dict[str, str]:
"""
Create a correspondance dictionary so that well_dict[ <RMS wellname> ]
= <Eclipse wellname>
"""
df = pd.read_csv(alias_file, index_col=rms_name)
well_dict = df.to_dict()
return well_dict[ecl_name]