from __future__ import annotations
import warnings
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Final
import numpy as np
import pandas as pd
import pyarrow as pa
import fmu.dataio as dio
from fmu.dataio._logging import null_logger
from fmu.dataio._models import InplaceVolumesResult
from fmu.dataio._models.fmu_results import standard_result
from fmu.dataio._models.fmu_results.enums import Classification, StandardResultName
from fmu.dataio.export import _enums
from fmu.dataio.export._decorators import experimental
from fmu.dataio.export._export_result import ExportResult, ExportResultItem
from fmu.dataio.export.rms._conditional_rms_imports import import_rms_package
from fmu.dataio.export.rms._utils import (
check_rmsapi_version,
get_rms_project_units,
load_global_config,
)
rmsapi, rmsjobs = import_rms_package()
_logger: Final = null_logger(__name__)
_VolumetricColumns = _enums.InplaceVolumes.VolumetricColumns
_TableIndexColumns = _enums.InplaceVolumes.TableIndexColumns
# rename columns to FMU standard
_RENAME_COLUMNS_FROM_RMS: Final = {
"Proj. real.": "REAL",
"Zone": _TableIndexColumns.ZONE.value,
"Segment": _TableIndexColumns.REGION.value,
"Boundary": _TableIndexColumns.LICENSE.value,
"Facies": _TableIndexColumns.FACIES.value,
"BulkOil": _VolumetricColumns.BULK.value + "_OIL",
"NetOil": _VolumetricColumns.NET.value + "_OIL",
"PoreOil": _VolumetricColumns.PORV.value + "_OIL",
"HCPVOil": _VolumetricColumns.HCPV.value + "_OIL",
"STOIIP": _VolumetricColumns.STOIIP.value + "_OIL",
"AssociatedGas": _VolumetricColumns.ASSOCIATEDGAS.value + "_OIL",
"BulkGas": _VolumetricColumns.BULK.value + "_GAS",
"NetGas": _VolumetricColumns.NET.value + "_GAS",
"PoreGas": _VolumetricColumns.PORV.value + "_GAS",
"HCPVGas": _VolumetricColumns.HCPV.value + "_GAS",
"GIIP": _VolumetricColumns.GIIP.value + "_GAS",
"AssociatedLiquid": _VolumetricColumns.ASSOCIATEDOIL.value + "_GAS",
"Bulk": _VolumetricColumns.BULK.value + "_TOTAL",
"Net": _VolumetricColumns.NET.value + "_TOTAL",
"Pore": _VolumetricColumns.PORV.value + "_TOTAL",
}
@dataclass
class _ExportVolumetricsRMS:
project: Any
grid_name: str
volume_job_name: str
def __post_init__(self) -> None:
_logger.debug("Process data, establish state prior to export.")
self._config = load_global_config()
self._volume_job = self._get_rms_volume_job_settings()
self._volume_table_name = self._read_volume_table_name_from_job()
self._dataframe = self._get_table_with_volumes()
_logger.debug("Process data... DONE")
@property
def _standard_result(self) -> standard_result.InplaceVolumesStandardResult:
"""Standard result type for the exported data."""
return standard_result.InplaceVolumesStandardResult(
name=StandardResultName.inplace_volumes
)
@property
def _classification(self) -> Classification:
"""Get default classification."""
return Classification.restricted
def _get_rms_volume_job_settings(self) -> dict:
"""Get information out from the RMS job API."""
_logger.debug("RMS VOLJOB settings...")
return rmsjobs.Job.get_job(
owner=["Grid models", self.grid_name, "Grid"],
type="Volumetrics",
name=self.volume_job_name,
).get_arguments()
def _read_volume_table_name_from_job(self) -> str:
"""Read the volume table name from RMS."""
_logger.debug("Read volume table name from RMS...")
voltable = self._volume_job.get("Report")
if isinstance(voltable, list):
voltable = voltable[0]
volume_table_name = voltable.get("ReportTableName")
if not volume_table_name:
raise RuntimeError(
"You need to configure output to Report file: Report table "
"in the volumetric job. Provide a table name and rerun the job."
)
_logger.debug("The volume table name is %s", volume_table_name)
return volume_table_name
def _get_table_with_volumes(self) -> pd.DataFrame:
"""
Get a volumetric table from RMS converted into a pandas
dataframe on standard format for the inplace_volumes standard result.
"""
table = self._get_table_from_rms()
table = self._convert_table_from_rms_to_legacy_format(table)
return self._convert_table_from_legacy_to_standard_format(table)
def _get_table_from_rms(self) -> pd.DataFrame:
"""Fetch volumetric table from RMS and convert to pandas dataframe"""
_logger.debug("Read values and convert to pandas dataframe...")
return pd.DataFrame.from_dict(
self.project.volumetric_tables[self._volume_table_name]
.get_data_table()
.to_dict()
)
@staticmethod
def _convert_table_from_rms_to_legacy_format(table: pd.DataFrame) -> pd.DataFrame:
"""Rename columns to legacy naming standard and drop REAL column if present."""
_logger.debug("Converting dataframe from RMS to legacy format...")
return table.rename(columns=_RENAME_COLUMNS_FROM_RMS).drop(
columns="REAL", errors="ignore"
)
@staticmethod
def _compute_water_zone_volumes_from_totals(table: pd.DataFrame) -> pd.DataFrame:
"""
Calculate 'water' zone volumes by subtracting HC-zone volumes from 'Total'
volumes which represents the entire zone. Total volumes are removed after
'water' zone volumes have been added to the table.
"""
_logger.debug("Computing water volumes from Totals...")
total_suffix = "_TOTAL"
total_columns = [col for col in table.columns if col.endswith(total_suffix)]
if not total_columns:
raise RuntimeError(
"Found no 'Totals' volumes in the table. Please ensure 'Totals' "
"are reported and rerun the volumetric job before export."
)
for total_col in total_columns:
volumetric_col = total_col.replace(total_suffix, "")
water_zone_col = f"{volumetric_col}_WATER"
oil_zone_col = f"{volumetric_col}_OIL"
gas_zone_col = f"{volumetric_col}_GAS"
# first set water zone data equal to the Total
# then subtract data from the oil/gas zone
table[water_zone_col] = table[total_col]
if oil_zone_col in table:
table[water_zone_col] -= table[oil_zone_col]
if gas_zone_col in table:
table[water_zone_col] -= table[gas_zone_col]
return table.drop(columns=total_columns)
@staticmethod
def _add_missing_columns_to_table(table: pd.DataFrame) -> pd.DataFrame:
"""Add columns with nan values if not present in table."""
_logger.debug("Add table index columns to table if missing...")
for col in _enums.InplaceVolumes.table_columns():
if col not in table:
table[col] = np.nan
return table
@staticmethod
def _set_net_equal_to_bulk_if_missing_in_table(table: pd.DataFrame) -> pd.DataFrame:
"""
Add a NET column to the table equal to the BULK column if NET is missing,
since the absence implies a net-to-gross ratio of 1.
"""
if _VolumetricColumns.NET.value not in table:
_logger.debug("NET column missing, setting NET equal BULK...")
table[_VolumetricColumns.NET.value] = table[_VolumetricColumns.BULK.value]
return table
@staticmethod
def _set_table_column_order(table: pd.DataFrame) -> pd.DataFrame:
"""Set the column order in the table."""
_logger.debug("Settting the table column order...")
return table[_enums.InplaceVolumes.table_columns()]
@staticmethod
def _transform_and_add_fluid_column_to_table(
table: pd.DataFrame, table_index: list[str]
) -> pd.DataFrame:
"""
Transformation of a dataframe containing fluid-specific column data into a
standardized format with unified column names, e.g. 'BULK_OIL' and 'PORV_OIL'
are renamed into 'BULK' and 'PORV' columns. To separate the data an additional
FLUID column is added that indicates the type of fluid the row represents.
"""
tables = []
for fluid in (
_enums.InplaceVolumes.Fluid.gas.value,
_enums.InplaceVolumes.Fluid.oil.value,
_enums.InplaceVolumes.Fluid.water.value,
):
fluid_suffix = fluid.upper()
fluid_columns = [
col for col in table.columns if col.endswith(f"_{fluid_suffix}")
]
if fluid_columns:
fluid_table = table[table_index + fluid_columns].copy()
# drop fluid suffix from columns to get standard names
fluid_table.columns = fluid_table.columns.str.replace(
f"_{fluid_suffix}", ""
)
# add the fluid as column entry instead
fluid_table[_TableIndexColumns.FLUID.value] = fluid
tables.append(fluid_table)
return pd.concat(tables, ignore_index=True) if tables else pd.DataFrame()
def _convert_table_from_legacy_to_standard_format(
self, table: pd.DataFrame
) -> pd.DataFrame:
"""
Convert the table from legacy to standard format for the 'inplace_volumes'
standard result. The standard format has a fluid column, and all table_index
and volumetric columns are present with a standard order in the table.
"""
table_index = [
col for col in _enums.InplaceVolumes.index_columns() if col in table
]
table = self._compute_water_zone_volumes_from_totals(table)
table = self._transform_and_add_fluid_column_to_table(table, table_index)
table = self._set_net_equal_to_bulk_if_missing_in_table(table)
table = self._add_missing_columns_to_table(table)
return self._set_table_column_order(table)
def _is_column_missing_in_table(self, column: str) -> bool:
"""Check if a column is present in the final dataframe and has values"""
return column not in self._dataframe or self._dataframe[column].isna().all()
def _validate_table(self) -> None:
"""
Validate that the final table with volumes is according to the standard
defined for the inplace_volumes standard result. The table should have the
required index and value columns, and at least one of the main types 'oil' or
'gas'.
"""
_logger.debug("Validating the dataframe...")
# check that all required index columns are present
for col in _enums.InplaceVolumes.required_index_columns():
if self._is_column_missing_in_table(col):
raise RuntimeError(
f"Required index column {col} is missing in the volumetric table. "
"Please update and rerun the volumetric job before export."
)
has_oil = "oil" in self._dataframe[_TableIndexColumns.FLUID.value].values
has_gas = "gas" in self._dataframe[_TableIndexColumns.FLUID.value].values
# check that one of oil and gas fluids are present
if not (has_oil or has_gas):
raise RuntimeError(
"One or both 'oil' and 'gas' needs to be selected as 'Main types'"
"in the volumetric job. Please update and rerun the volumetric job "
"before export."
)
# check that all required value columns are present
missing_calculations = []
for col in _enums.InplaceVolumes.required_value_columns():
if self._is_column_missing_in_table(col):
missing_calculations.append(col)
if has_oil and self._is_column_missing_in_table(
_VolumetricColumns.STOIIP.value
):
missing_calculations.append(_VolumetricColumns.STOIIP.value)
if has_gas and self._is_column_missing_in_table(_VolumetricColumns.GIIP.value):
missing_calculations.append(_VolumetricColumns.GIIP.value)
if missing_calculations:
raise RuntimeError(
f"Required calculations {missing_calculations} are missing "
f"in the volumetric table {self._volume_table_name}. Please update and "
"rerun the volumetric job before export."
)
df = self._dataframe.replace(np.nan, None).to_dict(orient="records")
InplaceVolumesResult.model_validate(df)
def _export_volume_table(self) -> ExportResult:
"""Do the actual volume table export using dataio setup."""
edata = dio.ExportData(
config=self._config,
content="volumes",
unit="m3" if get_rms_project_units(self.project) == "metric" else "ft3",
vertical_domain="depth",
domain_reference="msl",
subfolder="volumes",
classification=self._classification,
name=self.grid_name,
rep_include=False,
table_index=_enums.InplaceVolumes.index_columns(),
)
volume_table = pa.Table.from_pandas(self._dataframe)
# export the volume table with standard result info in the metadata
absolute_export_path = edata._export_with_standard_result(
volume_table,
standard_result=self._standard_result,
)
_logger.debug("Volume result to: %s", absolute_export_path)
return ExportResult(
items=[
ExportResultItem(
absolute_path=Path(absolute_export_path),
)
],
)
def export(self) -> ExportResult:
"""Validate and export the volume table."""
self._validate_table()
return self._export_volume_table()
[docs]
@experimental
def export_inplace_volumes(
project: Any,
grid_name: str,
volume_job_name: str,
) -> ExportResult:
"""Simplified interface when exporting volume tables (and assosiated data) from RMS.
Args:
project: The 'magic' project variable in RMS.
grid_name: Name of 3D grid model in RMS.
volume_job_name: Name of the volume job.
Note:
This function is experimental and may change in future versions.
Examples:
Example usage in an RMS script::
from fmu.dataio.export.rms import export_inplace_volumes
export_results = export_inplace_volumes(project, "Geogrid", "geogrid_volumes")
for result in export_results.items:
print(f"Output volumes to {result.absolute_path}")
""" # noqa: E501 line too long
check_rmsapi_version(minimum_version="1.7")
return _ExportVolumetricsRMS(
project,
grid_name,
volume_job_name,
).export()
# keep the old name for now but not log (will be removed soon as we expect close to
# zero usage so far)
[docs]
def export_rms_volumetrics(*args, **kwargs) -> ExportResult: # type: ignore
"""Deprecated function. Use export_inplace_volumes instead."""
warnings.warn(
"export_rms_volumetrics is deprecated and will be removed in a future release. "
"Use export_inplace_volumes instead.",
FutureWarning,
stacklevel=2,
)
return export_inplace_volumes(*args, **kwargs)