from __future__ import annotations
import copy
import warnings
from dataclasses import dataclass, field
from pathlib import Path
from typing import ClassVar, Final, Literal, Optional, Union
from pydantic import ValidationError
from fmu.dataio._models.fmu_results import fields
from fmu.dataio._models.fmu_results.enums import FMUContext
from . import _utils, dataio, types
from ._logging import null_logger
from ._metadata import ObjectMetadataExport
from .exceptions import InvalidMetadataError
from .providers.objectdata._provider import objectdata_provider_factory
logger: Final = null_logger(__name__)
# ######################################################################################
# AggregatedData
#
# The AggregatedData is used for making the aggregations from existing data that already
# have valid metadata, i.e. made from ExportData.
#
# Hence this is actually quite different and simpler than ExportData(), which
# needed a lot of info as FmuProvider, FileProvider, ObjectData etc. Here most these
# already known from the input.
#
# For aggregations, the id is normally given as an argument by the external process, and
# by that, be able to give a group of aggregations the same id.
#
# ######################################################################################
[docs]
@dataclass
class AggregatedData:
"""Instantate AggregatedData object.
Args:
aggregation_id: Give an explicit ID for the aggregation. If None, an ID will be
made based on existing realization uuids.
casepath: The root folder to the case, default is None. If None, the casepath
is derived from the first input metadata paths (cf. ``source_metadata``) if
possible. If given explicitly, the physical casepath folder must exist in
advance, otherwise a ValueError will be raised.
source_metadata: A list of individual metadata dictionarys, coming from the
valid metadata per input element that forms the aggregation.
operation: A string that describes the operation, e.g. "mean". This is
mandatory and there is no default.
tagname: Additional name, as part of file name
"""
# class variable(s)
meta_format: ClassVar[Optional[Literal["yaml", "json"]]] = None # deprecated
# instance
aggregation_id: Optional[str] = None
casepath: Optional[Union[str, Path]] = None
source_metadata: list = field(default_factory=list)
name: str = ""
operation: str = ""
tagname: str = ""
verbosity: str = "DEPRECATED" # keep for while
_metadata: ObjectMetadataExport = field(init=False)
_metafile: Path = field(default_factory=Path, init=False)
def __post_init__(self) -> None:
if self.verbosity != "DEPRECATED":
warnings.warn(
"Using the 'verbosity' key is now deprecated and will have no "
"effect and will be removed in near future. Please remove it from the "
"argument list. Set logging level from client script in the standard "
"manner instead.",
UserWarning,
)
if self.meta_format:
warnings.warn(
"The 'meta_format' option is deprecated and should be removed. "
"Metadata will only be exported in yaml format.",
UserWarning,
)
@staticmethod
def _generate_aggr_uuid(uuids: list[str]) -> str:
"""Unless aggregation_id; use existing UUIDs to generate a new UUID."""
return str(_utils.uuid_from_string("".join(sorted(uuids))))
def _update_settings(self, newsettings: dict) -> None:
"""Update instance settings (properties) from other routines."""
logger.info("Try new settings %s", newsettings)
# derive legal input from dataclass signature
annots = getattr(self, "__annotations__", {})
legals = {key: val for key, val in annots.items() if not key.startswith("_")}
for setting, value in newsettings.items():
if dataio._validate_variable(setting, value, legals):
setattr(self, setting, value)
logger.info("New setting OK for %s", setting)
def _construct_filename(self, template: dict) -> tuple[Path, Path | None]:
"""Construct the paths/filenames for aggregated data.
These filenames are constructed a bit different than in a forward job, since we
do not now which folder we 'are in' when doing aggregations. Could possibly also
be in a cloud setting.
Hence we use the first input realization as template, e.g.:
file:
relative_path: realization-33/iter-0/share/results/maps/x.gri
absolute_path: /scratch/f/case/realization-33/iter-0/share/results/maps/x.gri
And from thet we derive/compose the relative and absolute path for the
aggregated data:
file:
relative_path: iter-0/share/results/maps/aggr.gri
absolute_path: /scratch/f/case/iter-0/share/results/maps/aggr.gri
The trick is to replace 'realization-*' with nothing and create a new file
name.
-----
However, there are also the scenario that absolute_path are missing (e.g. all
input realizations are directly made in cloud setting), and we need to
account for that:
infile:
relative_path: realization-33/iter-0/share/results/maps/x.gri
absolute_path: none
file:
relative_path: iter-0/share/results/maps/aggr.gri
absolute_path: none
-----
Finally, a user given casepath (casepath is not None) should replace the current
root part in the files. Like this:
infile:
relative_path: realization-33/iter-0/share/results/maps/x.gri
absolute_path: /scratch/f/case/realization-33/iter-0/share/results/maps/x.gri
casepath = /scratch/f/othercase
result:
relative_path: iter-0/share/results/maps/aggr.gri
absolute_path: /scratch/f/othercase/iter-0/share/results/maps/aggrd.gri
"""
logger.info("Construct file name for the aggregation...")
realiname = template["fmu"]["realization"]["name"]
relpath = template["file"]["relative_path"]
if template["file"].get("absolute_path", None):
abspath = template["file"]["absolute_path"]
else:
abspath = None
logger.info("First input realization relpath is: %s ", relpath)
logger.info("First input realization abspath is: %s ", abspath)
if self.casepath:
casepath = Path(self.casepath)
if not casepath.exists():
raise ValueError(
f"The given casepath {casepath} does not exist. "
"It must exist in advance!"
)
abspath = str(casepath / relpath)
relpath = relpath.replace(realiname + "/", "")
relpath = Path(relpath)
if abspath:
abspath = abspath.replace(realiname + "/", "")
abspath = Path(abspath)
suffix = relpath.suffix
stem = relpath.stem
usename = stem + "--" + self.operation
if not self.name:
warnings.warn("Input name is not given, will assume <usename>", UserWarning)
else:
usename = self.name
if self.tagname:
usename = usename + "--" + self.tagname
relname = (relpath.parent / usename).with_suffix(suffix)
absname = None
if abspath:
absname = (abspath.parent / usename).with_suffix(suffix)
logger.info("New relpath is: %s ", relname)
logger.info("New abspath is: %s ", absname)
return relname, absname
def _set_metadata(
self,
obj: types.Inferrable,
real_ids: list[int],
uuids: list[str],
compute_md5: bool = True,
) -> None:
logger.info(
"self.aggregation is %s (%s)",
self.aggregation_id,
type(self.aggregation_id),
)
if not compute_md5:
warnings.warn(
"Using the 'compute_md5=False' option to prevent an MD5 checksum "
"from being computed is now deprecated. This option has no longer "
"an effect and will be removed in the near future.",
UserWarning,
)
if self.aggregation_id is None:
self.aggregation_id = self._generate_aggr_uuid(uuids)
elif not isinstance(self.aggregation_id, str):
raise ValueError("aggregation_id must be a string")
if not self.operation:
raise ValueError("The 'operation' key has no value")
# use first as template
template = copy.deepcopy(self.source_metadata[0])
relpath, abspath = self._construct_filename(template)
# fmu.realization shall not be used
del template["fmu"]["realization"]
template["fmu"]["aggregation"] = {}
template["fmu"]["aggregation"]["operation"] = self.operation
template["fmu"]["aggregation"]["realization_ids"] = real_ids
template["fmu"]["aggregation"]["id"] = self.aggregation_id
# fmu.context.stage should be 'iteration'
template["fmu"]["context"]["stage"] = FMUContext.iteration.value
# next, the new object will trigger update of: 'file', 'data' (some fields) and
# 'tracklog'.
# Make a temporary config from template to be allowed to
# initialize a temporary ExportData without warnings so that we can get to the
# objectdata_provider
config = {
"access": { # only asset and classification should be picked up
"asset": template["access"]["asset"],
"classification": template["access"]["classification"],
},
"masterdata": template["masterdata"],
"model": template["fmu"]["model"],
}
dummy_content = "depth" # will not be used, but will silence warning
etemp = dataio.ExportData(config=config, name=self.name, content=dummy_content)
objdata = objectdata_provider_factory(obj=obj, dataio=etemp)
template["tracklog"] = [fields.Tracklog.initialize()[0]]
template["file"] = {
"relative_path": str(relpath),
"absolute_path": str(abspath) if abspath else None,
"checksum_md5": _utils.compute_md5_from_objdata(objdata),
}
# data section
if self.name:
template["data"]["name"] = self.name
if self.tagname:
template["data"]["tagname"] = self.tagname
if bbox := objdata.get_bbox():
template["data"]["bbox"] = bbox
try:
self._metadata = ObjectMetadataExport.model_validate(template)
except ValidationError as err:
raise InvalidMetadataError(
f"The existing metadata for the aggregated data is invalid. "
f"Detailed information: \n{str(err)}"
) from err
# ==================================================================================
# Public methods:
# ==================================================================================
[docs]
def export(self, obj: types.Inferrable, **kwargs: object) -> str:
"""Export aggregated file with metadata to file.
Args:
obj: Aggregated object to export, e.g. a XTGeo RegularSurface
**kwargs: See AggregatedData() arguments; initial will be overridden by
settings here.
Returns:
String: full path to exported item.
"""
self._update_settings(kwargs)
metadata = self.generate_metadata(obj, compute_md5=True)
abspath = metadata["file"].get("absolute_path", None)
if not abspath:
raise OSError(
"The absolute_path is None, hence no export is possible. "
"Use the ``casepath`` key to provide a valid absolute path."
)
outfile = Path(abspath)
outfile.parent.mkdir(parents=True, exist_ok=True)
metafile = outfile.parent / ("." + str(outfile.name) + ".yml")
logger.info("Export to file and export metadata file.")
_utils.export_file(obj, outfile)
_utils.export_metadata_file(metafile, metadata)
logger.info("Actual file is: %s", outfile)
logger.info("Metadata file is: %s", metafile)
return str(outfile)