Skip to content

RDDMS Client

The class RDDMSClient is an asynchronous client that contains high-level methods for interacting with the open-etp-server. To set up a connection use rddms_connect as an awaitable, a context manager, or generator returning an instance of RDDMSClient.

An alternative is to use RDDMSClientSync for a synchronous alternative to RDDMSClient. This avoids the need for using await and async at the cost of lower efficiency, and slighly less flexibility. However, for many use-cases this will be more than enough.

Setting up a connection

rddms_io.client.rddms_connect

rddms_connect(
    uri: str,
    data_partition_id: str | None = None,
    authorization: str | SecretStr | None = None,
    etp_timeout: float | None = None,
    max_message_size: float = 2**20,
    use_compression: bool = True,
)

Connect to an RDDMS server via ETP.

This class can act as:

  1. A context manager handling setup and tear-down of the connection.
  2. An asynchronous iterator which can be used to persistently retry to connect if the websockets connection drops.
  3. An awaitable connection that must be manually closed by the user.

See below for examples of all three cases.

PARAMETER DESCRIPTION
uri

The uri to the RDDMS server. This should be the uri to a websockets endpoint to an ETP server.

TYPE: str

data_partition_id

The data partition id used when connecting to the OSDU open-etp-server in multi-partition mode. Default is None.

TYPE: str | None DEFAULT: None

authorization

Bearer token used for authenticating to the RDDMS server. This token should be on the form "Bearer 1234...". Default is None.

TYPE: str | SecretStr | None DEFAULT: None

etp_timeout

The timeout in seconds for when to stop waiting for a message from the server. Setting it to None will persist the connection indefinetly. Default is None.

TYPE: float | None DEFAULT: None

max_message_size

The maximum number of bytes for a single websockets message. Default is 2**20 corresponding to 1 MiB.

TYPE: float DEFAULT: 2 ** 20

use_compression

Flag to toggle if compression of the messages should be applied. So far the client (and the server) only supports compression with gzip. Default is True and compression is applied.

TYPE: bool DEFAULT: True

Examples:

An example of connecting to an RDDMS server using rddms_connect as a context manager is:

async with rddms_connect(...) as rddms_client:
    ...

In this case the closing message is sent and the websockets connection is closed once the program exits the context manager.

To persist a connection if the websockets connection is dropped (for any reason), use :func:rddms_connect as an asynchronous generator, viz.:

import websockets

async for rddms_client in rddms_connect(...):
    try:
        ...
    except websockets.ConnectionClosed:
        continue

    # Include `break` to avoid re-running the whole block if the
    # iteration runs without any errors.
    break

Note that in this case the whole program under the try-block is re-run from the start if the iteration completes normally, or if the websockets connection is dropped. Therefore, make sure to include a break at the end of the try-block (as in the example above).

The third option is to set up a connection via await and then manually close the connection once done:

rddms_client = await rddms_connect(...)
...
await rddms_client.close()
See Also

pyetp.client.etp_connect: The rddms_connect-class is a thin wrapper around etp_connect.

Source code in src/rddms_io/client.py
def __init__(
    self,
    uri: str,
    data_partition_id: str | None = None,
    authorization: str | SecretStr | None = None,
    etp_timeout: float | None = None,
    max_message_size: float = 2**20,
    use_compression: bool = True,
) -> None:
    self.uri = uri
    self.data_partition_id = data_partition_id

    if isinstance(authorization, SecretStr):
        self.authorization = authorization
    else:
        self.authorization = SecretStr(authorization)

    self.etp_timeout = etp_timeout
    self.max_message_size = max_message_size
    self.use_compression = use_compression

uri instance-attribute

uri = uri

data_partition_id instance-attribute

data_partition_id = data_partition_id

authorization instance-attribute

authorization = authorization

etp_timeout instance-attribute

etp_timeout = etp_timeout

max_message_size instance-attribute

max_message_size = max_message_size

use_compression instance-attribute

use_compression = use_compression

__await__

__await__() -> RDDMSClient
Source code in src/rddms_io/client.py
def __await__(self) -> RDDMSClient:
    return self.__aenter__().__await__()

__aenter__ async

__aenter__() -> RDDMSClient
Source code in src/rddms_io/client.py
async def __aenter__(self) -> RDDMSClient:
    etp_client = await etp_connect(
        uri=self.uri,
        data_partition_id=self.data_partition_id,
        authorization=self.authorization,
        etp_timeout=self.etp_timeout,
        max_message_size=self.max_message_size,
        use_compression=self.use_compression,
    )
    self.rddms_client = RDDMSClient(etp_client)

    return self.rddms_client

__aexit__ async

__aexit__(
    exc_type: Type[BaseException] | None,
    exc_value: BaseException | None,
    traceback: TracebackType | None,
) -> None
Source code in src/rddms_io/client.py
async def __aexit__(
    self,
    exc_type: typing.Type[BaseException] | None,
    exc_value: BaseException | None,
    traceback: TracebackType | None,
) -> None:
    return await self.rddms_client.close()

__aiter__ async

__aiter__() -> AsyncGenerator[RDDMSClient]
Source code in src/rddms_io/client.py
async def __aiter__(self) -> AsyncGenerator[RDDMSClient]:
    async for etp_client in etp_connect(
        uri=self.uri,
        data_partition_id=self.data_partition_id,
        authorization=self.authorization,
        etp_timeout=self.etp_timeout,
        max_message_size=self.max_message_size,
    ):
        yield RDDMSClient(etp_client)

Client implementation

rddms_io.client.RDDMSClient

RDDMSClient(etp_client: ETPClient)

Client using ETP to communicate with an RDDMS (Reservoir Domain Data Management Services) server. It is specifically tailored towards the OSDU open-etp-server and made with the intention to make it easier to interact with RDDMS by exposing ergonomic user-facing functions.

Notes

The client is meant to be set up via rddms_connect.

PARAMETER DESCRIPTION
etp_client

An instance of ETPClient.

TYPE: ETPClient

Source code in src/rddms_io/client.py
def __init__(self, etp_client: ETPClient) -> None:
    self.etp_client = etp_client

etp_client instance-attribute

etp_client = etp_client

close async

close() -> None

Method used for manual closing of the ETP-connection when the client has been set up outside a context manager. For example, if the client has been made via an await-statement then this method should be used to stop the connection.

Examples:

rddms_client = await rddms_connect(...)
...
await rddms_client.close()
Source code in src/rddms_io/client.py
async def close(self) -> None:
    """
    Method used for manual closing of the ETP-connection when the client
    has been set up outside a context manager. For example, if the client
    has been made via an `await`-statement then this method should be used
    to stop the connection.

    Examples
    --------

        rddms_client = await rddms_connect(...)
        ...
        await rddms_client.close()
    """
    await self.etp_client.close()

list_dataspaces async

list_dataspaces(
    store_last_write_filter: datetime | int | None = None,
) -> list[Dataspace]

Method used to list all dataspaces on the ETP-server.

PARAMETER DESCRIPTION
store_last_write_filter

A parameter that can be used to limit the results to only include dataspaces that were written to after the time specified in the filter. The default is None, meaning all dataspaces will be included.

TYPE: datetime | int | None DEFAULT: None

RETURNS DESCRIPTION
list[Dataspace]

A list of ETP Dataspace-data objects. See section 23.43.10 of the ETP v1.2 standards documentation for an accurate description of the different fields.

Source code in src/rddms_io/client.py
async def list_dataspaces(
    self, store_last_write_filter: datetime.datetime | int | None = None
) -> list[Dataspace]:
    """
    Method used to list all dataspaces on the ETP-server.

    Parameters
    ----------
    store_last_write_filter
        A parameter that can be used to limit the results to only include
        dataspaces that were written to after the time specified in the
        filter. The default is `None`, meaning all dataspaces will be
        included.

    Returns
    -------
    list[Dataspace]
        A list of ETP `Dataspace`-data objects. See section 23.43.10 of the
        ETP v1.2 standards documentation for an accurate description of the
        different fields.
    """

    if isinstance(store_last_write_filter, datetime.datetime):
        # Convert `datetime`-object to a microsecond resolution timestamp.
        store_last_write_filter = int(store_last_write_filter.timestamp() * 1e6)

    responses = await self.etp_client.send_and_recv(
        GetDataspaces(store_last_write_filter=store_last_write_filter)
    )
    parse_and_raise_response_errors(
        responses,
        GetDataspacesResponse,
        "RDDMSClient.list_dataspaces",
    )
    dataspaces = [ds for response in responses for ds in response.dataspaces]
    return dataspaces

delete_dataspace async

delete_dataspace(dataspace_uri: DataspaceURI | str) -> None

Method deleting a dataspace.

PARAMETER DESCRIPTION
dataspace_uri

The ETP dataspace uri, or path, for the dataspace to delete. If it is a dataspace path (on the form 'foo/bar') it will be converted to the dataspace uri "eml:///dataspace('foo/bar')".

TYPE: DataspaceURI | str

Source code in src/rddms_io/client.py
async def delete_dataspace(self, dataspace_uri: DataspaceURI | str) -> None:
    """
    Method deleting a dataspace.

    Parameters
    ----------
    dataspace_uri
        The ETP dataspace uri, or path, for the dataspace to delete. If it
        is a dataspace path (on the form `'foo/bar'`) it will be converted
        to the dataspace uri `"eml:///dataspace('foo/bar')"`.
    """
    dataspace_uri = str(DataspaceURI.from_any_etp_uri(dataspace_uri))
    responses = await self.etp_client.send_and_recv(
        DeleteDataspaces(uris={dataspace_uri: dataspace_uri})
    )
    parse_and_raise_response_errors(
        responses,
        DeleteDataspacesResponse,
        "RDDMSClient.delete_dataspace",
    )
    assert any([dataspace_uri in response.success for response in responses])

create_dataspace async

create_dataspace(
    dataspace_uri: str | DataspaceURI,
    legal_tags: list[str] = [],
    other_relevant_data_countries: list[str] = [],
    owners: list[str] = [],
    viewers: list[str] = [],
    ignore_if_exists: bool = False,
) -> None

Method creating a new dataspace on the ETP server. This function is limited to creating a single dataspace with optional access-control list (ACL) information.

PARAMETER DESCRIPTION
dataspace_uri

The ETP dataspace uri, or path, to create. If it is a dataspace path (on the form 'foo/bar') it will be converted to the dataspace uri "eml:///dataspace('foo/bar')".

TYPE: str | DataspaceURI

legal_tags

List of legal tag strings for the ACL. The default is an empty list.

TYPE: list[str] DEFAULT: []

other_relevant_data_countries

List of data countries for the ACL. The default is an empty list.

TYPE: list[str] DEFAULT: []

owners

List of owners ACL. The default is an empty list.

TYPE: list[str] DEFAULT: []

viewers

List of viewers ACL. The default is an empty list.

TYPE: list[str] DEFAULT: []

ignore_if_exists

When True the method silently ignores any ETPError with error code 5 (EINVALID_ARGUMENT). This error occurs if the dataspace already exists on the server. Otherwise, all errors are raised. Default is False.

TYPE: bool DEFAULT: False

Source code in src/rddms_io/client.py
async def create_dataspace(
    self,
    dataspace_uri: str | DataspaceURI,
    legal_tags: list[str] = [],
    other_relevant_data_countries: list[str] = [],
    owners: list[str] = [],
    viewers: list[str] = [],
    ignore_if_exists: bool = False,
) -> None:
    """
    Method creating a new dataspace on the ETP server. This function is
    limited to creating a single dataspace with optional access-control
    list (ACL) information.

    Parameters
    ----------
    dataspace_uri
        The ETP dataspace uri, or path, to create. If it is a dataspace
        path (on the form `'foo/bar'`) it will be converted to the
        dataspace uri `"eml:///dataspace('foo/bar')"`.
    legal_tags
        List of legal tag strings for the ACL. The default is an empty
        list.
    other_relevant_data_countries: list[str]
        List of data countries for the ACL. The default is an empty list.
    owners
        List of owners ACL. The default is an empty list.
    viewers
        List of viewers ACL. The default is an empty list.
    ignore_if_exists
        When `True` the method silently ignores any `ETPError` with error
        code `5` (`EINVALID_ARGUMENT`). This error occurs if the dataspace
        already exists on the server. Otherwise, all errors are raised.
        Default is `False`.
    """
    dataspace_uri = DataspaceURI.from_any_etp_uri(dataspace_uri)

    # A UTC timestamp in microseconds.
    now = int(datetime.datetime.now(datetime.timezone.utc).timestamp() * 1e6)

    def acl_parsing(acl_key: str, acl: list[str]) -> dict[str, DataValue]:
        if not acl:
            return {}
        return {acl_key: DataValue(item=ArrayOfString(values=acl))}

    custom_data = {
        **acl_parsing("legaltags", legal_tags),
        **acl_parsing("otherRelevantDataCountries", other_relevant_data_countries),
        **acl_parsing("owners", owners),
        **acl_parsing("viewers", viewers),
    }

    try:
        responses = await self.etp_client.send_and_recv(
            PutDataspaces(
                dataspaces={
                    str(dataspace_uri): Dataspace(
                        uri=str(dataspace_uri),
                        path=dataspace_uri.dataspace,
                        store_created=now,
                        store_last_write=now,
                        custom_data=custom_data,
                    )
                }
            )
        )
    except ETPError as e:
        if ignore_if_exists and e.code == 5:
            logger.info(
                f"Ignoring error in RDDMSClient.create_dataspace with message '{e}'"
            )
            return

        raise

    parse_and_raise_response_errors(
        responses, PutDataspacesResponse, "RDDMSClient.create_dataspace"
    )
    assert any([str(dataspace_uri) in response.success for response in responses])

start_transaction async

start_transaction(
    dataspace_uri: str | DataspaceURI,
    read_only: bool,
    debounce: bool | float = False,
) -> UUID

Method issuing a StartTransaction-ETP message, with optional debouncing to retry in case the dataspace is occupied with a different write transaction. Note that this method (unlike the raw ETP-message) is limited to starting a transaction on a single dataspace.

PARAMETER DESCRIPTION
dataspace_uri

A dataspace URI, either as a string or a DataspaceURI-object.

TYPE: str | DataspaceURI

read_only

Set to False for writing, and True for reading. It is mandatory to use a transaction when writing, but optional for reading.

TYPE: bool

debounce

Flag to toggle debouncing or maximum total debouncing time. If set to True, the client will continue to debounce forever until a transaction is started. Setting debounce to a floating point number will set a maximum total debouncing time (it will potentially retry several times within that window). Default is False, i.e., no debouncing is done, and failure to start a transaction will result in an error.

TYPE: bool | float DEFAULT: False

RETURNS DESCRIPTION
UUID

A standard library UUID with the transaction uuid.

Source code in src/rddms_io/client.py
async def start_transaction(
    self,
    dataspace_uri: str | DataspaceURI,
    read_only: bool,
    debounce: bool | float = False,
) -> uuid.UUID:
    """
    Method issuing a `StartTransaction`-ETP message, with optional
    debouncing to retry in case the dataspace is occupied with a different
    write transaction. Note that this method (unlike the raw ETP-message)
    is limited to starting a transaction on a single dataspace.

    Parameters
    ----------
    dataspace_uri: str | DataspaceURI
        A dataspace URI, either as a string or a `DataspaceURI`-object.
    read_only: bool
        Set to `False` for writing, and `True` for reading. It is mandatory
        to use a transaction when writing, but optional for reading.
    debounce: bool | float
        Flag to toggle debouncing or maximum total debouncing time.
        If set to `True`, the client will continue to debounce forever
        until a transaction is started. Setting debounce to a floating
        point number will set a maximum total debouncing time (it will
        potentially retry several times within that window). Default is
        `False`, i.e., no debouncing is done, and failure to start a
        transaction will result in an error.

    Returns
    -------
    uuid.UUID
        A standard library UUID with the transaction uuid.
    """

    dataspace_uri = str(DataspaceURI.from_any_etp_uri(dataspace_uri))
    total_timeout = debounce

    if isinstance(debounce, bool):
        total_timeout = None

    for ti in timeout_intervals(total_timeout):
        try:
            responses = await self.etp_client.send_and_recv(
                StartTransaction(
                    read_only=read_only, dataspace_uris=[dataspace_uri]
                ),
            )
        except ETPError as e:
            # Check if the error corresponds to the ETP Error
            # `EMAX_TRANSACTIONS_EXCEEDED`.
            if e.code != 15:
                raise

            if debounce:
                logger.info(f"Failed to start transaction retrying in {ti} seconds")
                await asyncio.sleep(ti)
                continue

            raise

        parse_and_raise_response_errors(
            responses, StartTransactionResponse, "RDDMSClient.start_transaction"
        )

        assert len(responses) == 1
        response = responses[0]

        if not response.successful:
            raise ETPTransactionFailure(str(response.failure_reason))

        transaction_uuid = uuid.UUID(str(response.transaction_uuid.root))
        logger.debug("Started transaction with uuid: {transaction_uuid}")
        return transaction_uuid

    raise ETPTransactionFailure(
        f"Failed to start transaction after {total_timeout} seconds",
    )

commit_transaction async

commit_transaction(transaction_uuid: bytes | str | UUID | Uuid) -> None

Method for commiting a transaction after completing all tasks that needs to be synchronized between the client and the server.

PARAMETER DESCRIPTION
transaction_uuid

The transaction uuid for the current transaction. This will typically be the uuid from the RDDMSClient.start_transaction-method.

TYPE: bytes | str | UUID | Uuid

Source code in src/rddms_io/client.py
async def commit_transaction(
    self,
    transaction_uuid: bytes | str | uuid.UUID | Uuid,
) -> None:
    """
    Method for commiting a transaction after completing all tasks that
    needs to be synchronized between the client and the server.

    Parameters
    ----------
    transaction_uuid: bytes | str | uuid.UUID | Uuid
        The transaction uuid for the current transaction. This will
        typically be the uuid from the
        `RDDMSClient.start_transaction`-method.
    """
    if isinstance(transaction_uuid, uuid.UUID):
        transaction_uuid = Uuid(transaction_uuid.bytes)
    elif isinstance(transaction_uuid, str | bytes):
        transaction_uuid = Uuid(transaction_uuid)

    responses = await self.etp_client.send_and_recv(
        CommitTransaction(transaction_uuid=transaction_uuid),
    )

    assert len(responses) == 1
    response = responses[0]

    if not response.successful:
        raise ETPTransactionFailure(str(response))

    logger.debug("Commited transaction with uuid: {transaction_uuid}")

rollback_transaction async

rollback_transaction(transaction_uuid: bytes | str | UUID | Uuid) -> None

Method for cancelling a running transaction. This will tell the server that it should disregard any changes incurred by the current transaction.

PARAMETER DESCRIPTION
transaction_uuid

The transaction uuid for the current transaction. This will typically be the uuid from the RDDMSClient.start_transaction-method.

TYPE: bytes | str | UUID | Uuid

Source code in src/rddms_io/client.py
async def rollback_transaction(
    self, transaction_uuid: bytes | str | uuid.UUID | Uuid
) -> None:
    """
    Method for cancelling a running transaction. This will tell the server
    that it should disregard any changes incurred by the current
    transaction.

    Parameters
    ----------
    transaction_uuid: bytes | str | uuid.UUID | Uuid
        The transaction uuid for the current transaction. This will
        typically be the uuid from the
        `RDDMSClient.start_transaction`-method.
    """
    if isinstance(transaction_uuid, uuid.UUID | str):
        transaction_uuid = Uuid(str(transaction_uuid).encode())
    elif isinstance(transaction_uuid, bytes):
        transaction_uuid = Uuid(transaction_uuid)

    response = await self.etp_client.send_and_recv(
        RollbackTransaction(transaction_uuid=transaction_uuid)
    )
    parse_and_raise_response_errors(
        [response], RollbackTransactionResponse, "RDDMSClient.rollback_transaction"
    )
    if not response.successful:
        raise ETPTransactionFailure(str(response))

list_objects_under_dataspace async

list_objects_under_dataspace(
    dataspace_uri: DataspaceURI | str,
    data_object_types: list[str | Type[AbstractCitedDataObject]] = [],
    count_objects: bool = True,
    store_last_write_filter: int | None = None,
) -> list[Resource]

This method will list all objects under a given dataspace.

PARAMETER DESCRIPTION
dataspace_uri

The uri of the dataspace to list objects.

TYPE: DataspaceURI | str

data_object_types

Object types to look for. This can either be a list of strings, e.g., ["eml20.*", "resqml20.obj_Grid2dRepresentation"] to query all Energistic Common version 2.0-objects and obj_Grid2dRepresentation-objects from RESQML v2.0.1, or it can be a list of objects from resqml_objects.v201, e.g., [ro.obj_Grid2dRepresentation, ro.obj_LocalDepth3dCrs]. Default is [], i.e., an empty list which means that all data object types will be returned.

TYPE: list[str | Type[AbstractCitedDataObject]] DEFAULT: []

count_objects

Toggle if the number of target and source objects should be counted. Default is True.

TYPE: bool DEFAULT: True

store_last_write_filter

Filter to only include objects that are written after the provided datetime or timestamp. Default is None, meaning no filter is applied. Note that the timestamp should be in microsecond resolution.

TYPE: int | None DEFAULT: None

RETURNS DESCRIPTION
list[Resource]

A list of Resource-objects.

Source code in src/rddms_io/client.py
async def list_objects_under_dataspace(
    self,
    dataspace_uri: DataspaceURI | str,
    data_object_types: list[str | typing.Type[ro.AbstractCitedDataObject]] = [],
    count_objects: bool = True,
    store_last_write_filter: int | None = None,
) -> list[Resource]:
    """
    This method will list all objects under a given dataspace.

    Parameters
    ----------
    dataspace_uri
        The uri of the dataspace to list objects.
    data_object_types
        Object types to look for. This can either be a list of strings,
        e.g., `["eml20.*", "resqml20.obj_Grid2dRepresentation"]` to query
        all Energistic Common version 2.0-objects and
        `obj_Grid2dRepresentation`-objects from RESQML v2.0.1, or it can be
        a list of objects from `resqml_objects.v201`, e.g.,
        `[ro.obj_Grid2dRepresentation, ro.obj_LocalDepth3dCrs]`. Default is
        `[]`, i.e., an empty list which means that all data object types
        will be returned.
    count_objects
        Toggle if the number of target and source objects should be
        counted. Default is `True`.
    store_last_write_filter
        Filter to only include objects that are written after the provided
        datetime or timestamp. Default is `None`, meaning no filter is
        applied. Note that the timestamp should be in microsecond
        resolution.

    Returns
    -------
    list[Resource]
        A list of
        [`Resource`][energistics.etp.v12.datatypes.object.Resource]-objects.
    """
    dataspace_uri = str(DataspaceURI.from_any_etp_uri(dataspace_uri))
    data_object_types = [
        dot if isinstance(dot, str) else dot.get_qualified_type()
        for dot in data_object_types
    ]

    gr = GetResources(
        context=ContextInfo(
            uri=dataspace_uri,
            depth=1,  # Ignored when `scope="self"`.
            data_object_types=data_object_types,
            # TODO: Check if `navigable_edges` give any different results.
            navigable_edges="Primary",
        ),
        scope="self",
        count_objects=count_objects,
        store_last_write_filter=store_last_write_filter,
        # Use the `list_linked_objects`-method below to see edges.
        include_edges=False,
    )

    responses = await self.etp_client.send_and_recv(gr)

    parse_and_raise_response_errors(
        responses, GetResourcesResponse, "RDDMSClient.list_objects_under_dataspace"
    )
    return [resource for response in responses for resource in response.resources]

list_linked_objects async

list_linked_objects(
    start_uri: DataObjectURI | str,
    data_object_types: list[str | Type[AbstractCitedDataObject]] = [],
    store_last_write_filter: datetime | int | None = None,
    depth: int = 1,
) -> LinkedObjects

Method listing all objects that are linked to the provided object uri. That is, starting from the object indexed by the uri start_uri it finds all objects (sources) that links to it, and all objects (targets) it links to.

PARAMETER DESCRIPTION
start_uri

An ETP data object uri to start the query from.

TYPE: DataObjectURI | str

data_object_types

A filter to limit which types of objects to include in the results. As a string it is on the form eml20.obj_EpcExternalPartReference for a specific object, or eml20.* for all Energistics Common objects. For the RESQML v2.0.1 objects it is similarlarly resqml20.*, or a specific type instead of the wildcard *. This can also be classes from resqml_objects.v201, in which case the filter will be constructed. Default is [], meaning no filter is applied.

TYPE: list[str | Type[AbstractCitedDataObject]] DEFAULT: []

store_last_write_filter

Filter to only include objects that are written after the provided datetime or timestamp. Default is None, meaning no filter is applied. Note that the timestamp should be in microsecond resolution.

TYPE: datetime | int | None DEFAULT: None

depth

The number of links to return. Setting depth = 1 will only return targets and sources that are directly linked to the start object. With depth = 2 we get links to objects that linkes to the targets and sources of the start object. Default is 1.

TYPE: int DEFAULT: 1

RETURNS DESCRIPTION
LinkedObjects

A container (NamedTuple) with resources and edges for the sources and targets of the start-object.

Source code in src/rddms_io/client.py
async def list_linked_objects(
    self,
    start_uri: DataObjectURI | str,
    data_object_types: list[str | typing.Type[ro.AbstractCitedDataObject]] = [],
    store_last_write_filter: datetime.datetime | int | None = None,
    depth: int = 1,
) -> LinkedObjects:
    """
    Method listing all objects that are linked to the provided object uri.
    That is, starting from the object indexed by the uri `start_uri` it
    finds all objects (sources) that links to it, and all objects (targets)
    it links to.

    Parameters
    ----------
    start_uri: DataObjectURI | str
        An ETP data object uri to start the query from.
    data_object_types: list[str | typing.Type[ro.AbstractCitedDataObject]]
        A filter to limit which types of objects to include in the results.
        As a string it is on the form `eml20.obj_EpcExternalPartReference`
        for a specific object, or `eml20.*` for all Energistics Common
        objects. For the RESQML v2.0.1 objects it is similarlarly
        `resqml20.*`, or a specific type instead of the wildcard `*`. This
        can also be classes from `resqml_objects.v201`, in which case the
        filter will be constructed. Default is `[]`, meaning no filter is
        applied.
    store_last_write_filter: datetime.datetime | int | None
        Filter to only include objects that are written after the provided
        datetime or timestamp. Default is `None`, meaning no filter is
        applied. Note that the timestamp should be in microsecond
        resolution.
    depth: int
        The number of links to return. Setting `depth = 1` will only return
        targets and sources that are directly linked to the start object.
        With `depth = 2` we get links to objects that linkes to the targets
        and sources of the start object. Default is `1`.

    Returns
    -------
    LinkedObjects
        A container (`NamedTuple`) with resources and edges for the sources
        and targets of the start-object.
    """
    data_object_types = [
        dot if isinstance(dot, str) else dot.get_qualified_type()
        for dot in data_object_types
    ]
    if isinstance(store_last_write_filter, datetime.datetime):
        # Convert `datetime`-object to a microsecond resolution timestamp.
        store_last_write_filter = int(store_last_write_filter.timestamp() * 1e6)

    gr_sources = GetResources(
        context=ContextInfo(
            uri=str(start_uri),
            depth=depth,
            data_object_types=data_object_types,
            navigable_edges=RelationshipKind.PRIMARY,
        ),
        # Setting the scope to `SOURCES_OR_SELF` returns the start-object
        # resource _and_ the edge(s) between the start-object and its
        # sources.
        scope=ContextScopeKind.SOURCES_OR_SELF,
        count_objects=True,
        store_last_write_filter=store_last_write_filter,
        include_edges=True,
    )

    gr_targets = GetResources(
        context=ContextInfo(
            uri=str(start_uri),
            depth=depth,
            data_object_types=data_object_types,
            navigable_edges=RelationshipKind.PRIMARY,
        ),
        scope=ContextScopeKind.TARGETS_OR_SELF,
        count_objects=True,
        store_last_write_filter=store_last_write_filter,
        include_edges=True,
    )

    task_responses = await asyncio.gather(
        self.etp_client.send_and_recv(gr_sources),
        self.etp_client.send_and_recv(gr_targets),
    )

    sources_responses = task_responses[0]
    targets_responses = task_responses[1]

    source_edges = [
        e
        for grer in filter(
            lambda e: isinstance(e, GetResourcesEdgesResponse), sources_responses
        )
        for e in grer.edges
    ]
    source_resources = [
        r
        for grr in filter(
            lambda e: isinstance(e, GetResourcesResponse), sources_responses
        )
        for r in grr.resources
    ]

    target_edges = [
        e
        for grer in filter(
            lambda e: isinstance(e, GetResourcesEdgesResponse), targets_responses
        )
        for e in grer.edges
    ]
    target_resources = [
        r
        for grr in filter(
            lambda e: isinstance(e, GetResourcesResponse), targets_responses
        )
        for r in grr.resources
    ]

    self_resource = next(filter(lambda sr: sr.uri == start_uri, source_resources))

    # Remove "self" from list of resources.
    source_resources = list(
        filter(
            lambda sr: sr.uri != start_uri,
            source_resources,
        )
    )
    target_resources = list(
        filter(
            lambda tr: tr.uri != start_uri,
            target_resources,
        )
    )

    return LinkedObjects(
        start_uri=start_uri,
        self_resource=self_resource,
        source_resources=source_resources,
        source_edges=source_edges,
        target_resources=target_resources,
        target_edges=target_edges,
    )

list_array_metadata async

list_array_metadata(
    ml_uris: list[str | DataObjectURI],
) -> dict[str, dict[str, DataArrayMetadata]]

Method used for listing array metadata for all connected arrays to the provided data object uris. This method downloads the data objects from the uris, and calls RDDMSClient.list_object_array_metadata to get the actual metadata. If the objects have already been downloaded, then using RDDMSClient.list_object_array_metadata will be more efficient.

The purpose of this method is to provide a more convenient way of exploring an RDDMS server without needing to handle data objects. It is recommended to use RDDMSClient.list_object_array_metadata if the objects have already been downloaded.

PARAMETER DESCRIPTION
ml_uris

A list of ETP data object uris.

TYPE: list[str | DataObjectURI]

RETURNS DESCRIPTION
dict[str, dict[str, DataArrayMetadata]]

A dictionary indexed by the data object uri, containing a new dictionary with the path in resource as the key and the metadata (the ETP datatype DataArrayMetadata) as the value. Note that if there is no array connected to a data object uri, there will be no entry in the returned dict for this uri.

See Also

RDDMSClient.list_object_array_metadata: A similar method that fetches the metadata from the objects themselves along with a dataspace uri. It is recommended to use list_object_array_metadata if you already have the objects in memory.

Source code in src/rddms_io/client.py
async def list_array_metadata(
    self,
    ml_uris: list[str | DataObjectURI],
) -> dict[str, dict[str, DataArrayMetadata]]:
    """
    Method used for listing array metadata for all connected arrays to the
    provided data object uris. This method downloads the data objects from
    the uris, and calls `RDDMSClient.list_object_array_metadata` to get the
    actual metadata. If the objects have already been downloaded, then
    using `RDDMSClient.list_object_array_metadata` will be more efficient.

    The purpose of this method is to provide a more convenient way of
    exploring an RDDMS server without needing to handle data objects. It is
    recommended to use `RDDMSClient.list_object_array_metadata` if the
    objects have already been downloaded.

    Parameters
    ----------
    ml_uris
        A list of ETP data object uris.

    Returns
    -------
    dict[str, dict[str, DataArrayMetadata]]
        A dictionary indexed by the data object uri, containing a new
        dictionary with the path in resource as the key and the metadata
        (the ETP datatype `DataArrayMetadata`) as the value. Note that if
        there is no array connected to a data object uri, there will be no
        entry in the returned dict for this uri.

    See Also
    --------
    [`RDDMSClient.list_object_array_metadata`][rddms_io.client.RDDMSClient.list_object_array_metadata]:
        A similar method that fetches the metadata from the objects
        themselves along with a dataspace uri. It is recommended to use
        `list_object_array_metadata` if you already have the objects in
        memory.
    """
    ml_objects = await self.download_models(
        ml_uris=ml_uris,
        download_arrays=False,
        download_linked_objects=False,
    )

    if not ml_objects:
        return {}

    ml_uris = [DataObjectURI.from_uri(uri) for uri in ml_uris]
    dataspace_uris = [DataspaceURI.from_any_etp_uri(uri) for uri in ml_uris]

    array_metadata = await asyncio.gather(
        *[
            self.list_object_array_metadata(
                dataspace_uri=dataspace_uri,
                ml_objects=[ml_object.obj],
            )
            for dataspace_uri, ml_object in zip(dataspace_uris, ml_objects)
        ]
    )

    metadata_map = {}
    for am in array_metadata:
        metadata_map = {**metadata_map, **am}

    return metadata_map

list_object_array_metadata async

list_object_array_metadata(
    dataspace_uri: str | DataspaceURI,
    ml_objects: Sequence[AbstractCitedDataObject],
) -> dict[str, dict[str, DataArrayMetadata]]

Method used for listing array metadata for all connected arrays to the provided RESQML-objects. This method works by taking in a dataspace uri and the objects themselves (instead of their uris) as they would need to be downloaded to look up which arrays they link to.

PARAMETER DESCRIPTION
dataspace_uri

The ETP dataspace uri where the objects are located.

TYPE: str | DataspaceURI

ml_objects

A list (or any sequence) of objects that links to arrays.

TYPE: Sequence[AbstractCitedDataObject]

RETURNS DESCRIPTION
dict[str, dict[str, DataArrayMetadata]]

A dictionary indexed by the data object uri, containing a new dictionary with the path in resource as the key and the metadata (the ETP datatype DataArrayMetadata) as the value.

See Also

RDDMSClient.list_array_metadata: A similar method that looks up array metadata needing only the uris of the objects.

Source code in src/rddms_io/client.py
async def list_object_array_metadata(
    self,
    dataspace_uri: str | DataspaceURI,
    ml_objects: Sequence[ro.AbstractCitedDataObject],
) -> dict[str, dict[str, DataArrayMetadata]]:
    """
    Method used for listing array metadata for all connected arrays to the
    provided RESQML-objects. This method works by taking in a dataspace uri
    and the objects themselves (instead of their uris) as they would need
    to be downloaded to look up which arrays they link to.

    Parameters
    ----------
    dataspace_uri: str | DataspaceURI
        The ETP dataspace uri where the objects are located.
    ml_objects: Sequence[ro.AbstractCitedDataObject]
        A list (or any sequence) of objects that links to arrays.

    Returns
    -------
    dict[str, dict[str, DataArrayMetadata]]
        A dictionary indexed by the data object uri, containing a new
        dictionary with the path in resource as the key and the metadata
        (the ETP datatype `DataArrayMetadata`) as the value.

    See Also
    --------
    [`RDDMSClient.list_array_metadata`][rddms_io.client.RDDMSClient.list_array_metadata]:
        A similar method that looks up array metadata needing only the uris
        of the objects.
    """
    dataspace_uri = str(DataspaceURI.from_any_etp_uri(dataspace_uri))

    ml_uris = []
    tasks = []
    for obj in ml_objects:
        obj_dais = []
        for hdf5_dataset in find_hdf5_datasets(obj):
            path_in_resource = hdf5_dataset.path_in_hdf_file
            epc_uri = hdf5_dataset.hdf_proxy.get_etp_data_object_uri(
                dataspace_path_or_uri=dataspace_uri,
            )

            dai = DataArrayIdentifier(
                uri=epc_uri,
                path_in_resource=path_in_resource,
            )
            obj_dais.append(dai)

        if obj_dais:
            ml_uris.append(
                obj.get_etp_data_object_uri(dataspace_path_or_uri=dataspace_uri)
            )
            task = self.etp_client.send_and_recv(
                GetDataArrayMetadata(
                    data_arrays={dai.path_in_resource: dai for dai in obj_dais},
                )
            )
            tasks.append(task)

    if not tasks:
        logger.info(
            "There were no arrays connected to input objects with uris: "
            f"{[obj.get_etp_data_object_uri(dataspace_uri) for obj in ml_objects]}"
        )
        return {}

    task_responses = await asyncio.gather(*tasks)

    metadata_map = {}
    for uri, tr in zip(ml_uris, task_responses):
        parse_and_raise_response_errors(
            tr,
            GetDataArrayMetadataResponse,
            "RDDMSClient.list_array_metadata",
        )

        pirm = {}
        for response in tr:
            pirm = {**pirm, **response.array_metadata}

        metadata_map[uri] = pirm

    return metadata_map

upload_array async

upload_array(
    epc_uri: str | DataObjectURI,
    path_in_resource: str,
    data: NDArray[LogicalArrayDTypes],
) -> None

Method used for uploading a single array to an ETP server. This method will not work without the user setting up a transaction for writing to the relevant dataspace. It should not be necessary for a user to call this method, prefer RDDMSClient.upload_model instead.

PARAMETER DESCRIPTION
epc_uri

An ETP data object uri to an obj_EpcExternalPartReference that is connected to the object that links to the provided array.

TYPE: str | DataObjectURI

path_in_resource

A key (typically a HDF5-key) that uniquely identifies the array along with the epc_uri. This key is found in the object that links to the provided array.

TYPE: str

data

A NumPy-array with the data.

TYPE: NDArray[LogicalArrayDTypes]

See Also

RDDMSClient.upload_model: A higher-level method that wraps transaction handling, data object uploading and array uploading in one go.

Source code in src/rddms_io/client.py
async def upload_array(
    self,
    epc_uri: str | DataObjectURI,
    path_in_resource: str,
    data: npt.NDArray[utils_arrays.LogicalArrayDTypes],
) -> None:
    """
    Method used for uploading a single array to an ETP server. This method
    will not work without the user setting up a transaction for writing to
    the relevant dataspace. It should not be necessary for a user to call
    this method, prefer `RDDMSClient.upload_model` instead.

    Parameters
    ----------
    epc_uri
        An ETP data object uri to an `obj_EpcExternalPartReference` that is
        connected to the object that links to the provided array.
    path_in_resource
        A key (typically a HDF5-key) that uniquely identifies the array
        along with the `epc_uri`. This key is found in the object that
        links to the provided array.
    data
        A NumPy-array with the data.

    See Also
    --------
    [`RDDMSClient.upload_model`][rddms_io.client.RDDMSClient.upload_model]:
        A higher-level method that wraps transaction handling, data object
        uploading and array uploading in one go.
    """
    # Fetch ETP logical and transport array types
    logical_array_type, transport_array_type = (
        utils_arrays.get_logical_and_transport_array_types(data.dtype)
    )

    # Create identifier for the data.
    dai = DataArrayIdentifier(
        uri=str(epc_uri),
        path_in_resource=path_in_resource,
    )

    # Get current time as a UTC-timestamp.
    now = int(datetime.datetime.now(datetime.timezone.utc).timestamp() * 1e6)

    # Allocate space on server for the array.
    responses = await self.etp_client.send_and_recv(
        PutUninitializedDataArrays(
            data_arrays={
                dai.path_in_resource: PutUninitializedDataArrayType(
                    uid=dai,
                    metadata=DataArrayMetadata(
                        dimensions=list(data.shape),
                        transport_array_type=transport_array_type,
                        logical_array_type=logical_array_type,
                        store_last_write=now,
                        store_created=now,
                    ),
                ),
            },
        ),
    )

    assert len(responses) == 1
    response = responses[0]

    self.etp_client.assert_response(response, PutUninitializedDataArraysResponse)
    assert len(response.success) == 1 and dai.path_in_resource in response.success

    # Check if we can upload the entire array in go, or if we need to
    # upload it in smaller blocks.

    # TODO: Check if we should use `await` instead of `asyncio.gather`.
    # This is to ensure that the websockets heartbeat gets to run once in a
    # while.
    if data.nbytes > self.etp_client.max_array_size:
        tasks = []

        # Get list with starting indices in each block, and a list with the
        # number of elements along each axis for each block.
        block_starts, block_counts = utils_arrays.get_array_block_sizes(
            data.shape, data.dtype, self.etp_client.max_array_size
        )

        for starts, counts in zip(block_starts, block_counts):
            # Create slice-objects for each block.
            slices = tuple(
                map(
                    lambda s, c: slice(s, s + c),
                    np.array(starts).astype(int),
                    np.array(counts).astype(int),
                )
            )

            # Slice the array, and convert to the relevant ETP-array type.
            # Note in the particular the extra `.data`-after the call. The
            # data should not be of type `DataArray`, but `AnyArray`, so we
            # need to fetch it from the `DataArray`.
            etp_subarray_data = utils_arrays.get_etp_data_array_from_numpy(
                data[slices]
            ).data

            # Create an asynchronous task to upload a block to the
            # ETP-server.
            task = self.etp_client.send_and_recv(
                PutDataSubarrays(
                    data_subarrays={
                        dai.path_in_resource: PutDataSubarraysType(
                            uid=dai,
                            data=etp_subarray_data,
                            starts=starts,
                            counts=counts,
                        ),
                    },
                ),
            )
            tasks.append(task)

        # Upload all blocks.
        task_responses = await asyncio.gather(*tasks)

        # Flatten list of responses.
        responses = [
            response
            for task_response in task_responses
            for response in task_response
        ]

        # Check for successful responses.
        for response in responses:
            self.etp_client.assert_response(response, PutDataSubarraysResponse)
            assert (
                len(response.success) == 1
                and dai.path_in_resource in response.success
            )

        # Return after uploading all sub arrays.
        return

    # Convert NumPy data-array to an ETP-transport array.
    etp_array_data = utils_arrays.get_etp_data_array_from_numpy(data)

    # Pass entire array in one message.
    responses = await self.etp_client.send_and_recv(
        PutDataArrays(
            data_arrays={
                dai.path_in_resource: PutDataArraysType(
                    uid=dai,
                    array=etp_array_data,
                ),
            }
        )
    )

    assert len(responses) == 1
    response = responses[0]

    self.etp_client.assert_response(response, PutDataArraysResponse)
    assert len(response.success) == 1 and dai.path_in_resource in response.success

download_object_arrays async

download_object_arrays(
    dataspace_uri: str | DataspaceURI, ml_object: AbstractCitedDataObject
) -> dict[str, NDArray[LogicalArrayDTypes]]

Method accepting a dataspace_uri (or dataspace path) and a RESQML-object, and downloading all attached arrays (if any). This method is mainly used as a helper method for RDDMSClient.download_models.

PARAMETER DESCRIPTION
dataspace_uri

An ETP dataspace uri or path. This can be a string or a [DataspaceURI][pyetp.uri.DataspaceURI]-object.

TYPE: str | DataspaceURI

ml_object

An instance of a RESQML-object.

TYPE: AbstractCitedDataObject

RETURNS DESCRIPTION
dict[str, NDArray[LogicalArrayDTypes]]

A dictionary mapping the path_in_hdf_file-keys in ml_object to the corresponding array. Empty if ml_object does not reference any arrays.

See Also

RDDMSClient.download_models: The "full" method downloading objects, arrays and potentially linked objects.

Source code in src/rddms_io/client.py
async def download_object_arrays(
    self,
    dataspace_uri: str | DataspaceURI,
    ml_object: ro.AbstractCitedDataObject,
) -> dict[str, npt.NDArray[utils_arrays.LogicalArrayDTypes]]:
    """
    Method accepting a `dataspace_uri` (or dataspace path) and a
    RESQML-object, and downloading all attached arrays (if any). This
    method is mainly used as a helper method for
    [`RDDMSClient.download_models`][rddms_io.client.RDDMSClient.download_models].

    Parameters
    ----------
    dataspace_uri
        An ETP dataspace uri or path. This can be a string or a
        [`DataspaceURI`][pyetp.uri.DataspaceURI]-object.
    ml_object
        An instance of a RESQML-object.

    Returns
    -------
    dict[str, npt.NDArray[utils_arrays.LogicalArrayDTypes]]
        A dictionary mapping the `path_in_hdf_file`-keys in `ml_object` to
        the corresponding array. Empty if `ml_object` does not reference
        any arrays.

    See Also
    --------
    [`RDDMSClient.download_models`][rddms_io.client.RDDMSClient.download_models]:
        The "full" method downloading objects, arrays and potentially
        linked objects.
    """
    dataspace_uri = str(DataspaceURI.from_any_etp_uri(dataspace_uri))
    ml_hds = find_hdf5_datasets(ml_object)

    if len(ml_hds) == 0:
        logger.info(
            f"Object {type(ml_object).__name__}, titled "
            f"'{ml_object.citation.title}', does not reference any arrays."
        )

        return {}

    tasks = []
    for hdf5_dataset in ml_hds:
        path_in_resource = hdf5_dataset.path_in_hdf_file
        epc_uri = hdf5_dataset.hdf_proxy.get_etp_data_object_uri(
            dataspace_path_or_uri=dataspace_uri
        )
        task = self.download_array(
            epc_uri=epc_uri,
            path_in_resource=path_in_resource,
        )
        tasks.append(task)

    arrays = await asyncio.gather(*tasks)
    data_arrays = {hdf.path_in_hdf_file: arr for hdf, arr in zip(ml_hds, arrays)}

    return data_arrays

download_array async

download_array(
    epc_uri: str | DataObjectURI, path_in_resource: str
) -> NDArray[LogicalArrayDTypes]

Method used for downloading a single array from an ETP server. It should not be necessary for a user to call this method, prefer RDDMSClient.download_models instead.

PARAMETER DESCRIPTION
epc_uri

An ETP data object uri to an obj_EpcExternalPartReference that is connected to the object that links to the provided array.

TYPE: str | DataObjectURI

path_in_resource

A key (typically a HDF5-key) that uniquely identifies the array along with the epc_uri. This key is found in the object that links to the provided array.

TYPE: str

RETURNS DESCRIPTION
data

A NumPy-array with the data.

TYPE: NDArray[LogicalArrayDTypes]

See Also

RDDMSClient.download_models: A higher-level method that wraps, data object and array downloading in one go.

Source code in src/rddms_io/client.py
async def download_array(
    self,
    epc_uri: str | DataObjectURI,
    path_in_resource: str,
) -> npt.NDArray[utils_arrays.LogicalArrayDTypes]:
    """
    Method used for downloading a single array from an ETP server. It
    should not be necessary for a user to call this method, prefer
    `RDDMSClient.download_models` instead.

    Parameters
    ----------
    epc_uri: str | DataObjectURI
        An ETP data object uri to an `obj_EpcExternalPartReference` that is
        connected to the object that links to the provided array.
    path_in_resource: str
        A key (typically a HDF5-key) that uniquely identifies the array
        along with the `epc_uri`. This key is found in the object that
        links to the provided array.

    Returns
    -------
    data: npt.NDArray[utils_arrays.LogicalArrayDTypes]
        A NumPy-array with the data.

    See Also
    --------
    [`RDDMSClient.download_models`][rddms_io.client.RDDMSClient.download_models]:
        A higher-level method that wraps, data object and array downloading
        in one go.
    """
    # Create identifier for the data.
    dai = DataArrayIdentifier(
        uri=str(epc_uri),
        path_in_resource=path_in_resource,
    )

    responses = await self.etp_client.send_and_recv(
        GetDataArrayMetadata(data_arrays={dai.path_in_resource: dai}),
    )

    assert len(responses) == 1
    response = responses[0]

    self.etp_client.assert_response(response, GetDataArrayMetadataResponse)
    assert (
        len(response.array_metadata) == 1
        and dai.path_in_resource in response.array_metadata
    )

    metadata = response.array_metadata[dai.path_in_resource]

    # Check if we can download the full array in a single message.
    if (
        utils_arrays.get_transport_array_size(
            metadata.transport_array_type, metadata.dimensions
        )
        >= self.etp_client.max_array_size
    ):
        transport_dtype = utils_arrays.get_dtype_from_any_array_type(
            metadata.transport_array_type,
        )
        # NOTE: The logical array type is not yet supported by the
        # open-etp-server. As such the transport array type will be actual
        # array type used. We only add this call to prepare for when it
        # will be used.
        logical_dtype = utils_arrays.get_dtype_from_any_logical_array_type(
            metadata.logical_array_type,
        )
        if logical_dtype != np.dtype(np.bool_):
            # If this debug message is triggered we should test the
            # mapping.
            logger.debug(
                "Logical array type has changed: "
                f"{metadata.logical_array_type = }, with {logical_dtype = }"
            )

        # Create a buffer for the data.
        data = np.zeros(metadata.dimensions, dtype=transport_dtype)

        # Get list with starting indices in each block, and a list with the
        # number of elements along each axis for each block.
        block_starts, block_counts = utils_arrays.get_array_block_sizes(
            data.shape, data.dtype, self.etp_client.max_array_size
        )

        def data_subarrays_key(pir: str, i: int) -> str:
            return pir + f" ({i})"

        # TODO: Consider using `await` instead of `asyncio.gather` to give
        # the websockets connection time to run the heartbeat
        # communication.
        tasks = []
        for i, (starts, counts) in enumerate(zip(block_starts, block_counts)):
            task = self.etp_client.send_and_recv(
                GetDataSubarrays(
                    data_subarrays={
                        data_subarrays_key(
                            dai.path_in_resource, i
                        ): GetDataSubarraysType(
                            uid=dai,
                            starts=starts,
                            counts=counts,
                        ),
                    },
                ),
            )
            tasks.append(task)

        task_responses = await asyncio.gather(*tasks)
        responses = [
            response
            for task_response in task_responses
            for response in task_response
        ]

        data_blocks = []
        for i, response in enumerate(responses):
            self.etp_client.assert_response(response, GetDataSubarraysResponse)
            assert (
                len(response.data_subarrays) == 1
                and data_subarrays_key(dai.path_in_resource, i)
                in response.data_subarrays
            )

            data_block = response.data_subarrays[
                data_subarrays_key(dai.path_in_resource, i)
            ].to_numpy_array()
            data_blocks.append(data_block)

        for data_block, starts, counts in zip(
            data_blocks, block_starts, block_counts
        ):
            # Create slice-objects for each block.
            slices = tuple(
                map(
                    lambda s, c: slice(s, s + c),
                    np.array(starts).astype(int),
                    np.array(counts).astype(int),
                )
            )
            data[slices] = data_block

        # Return after fetching all sub arrays.
        return data

    # Download the full array in one go.
    responses = await self.etp_client.send_and_recv(
        GetDataArrays(data_arrays={dai.path_in_resource: dai}),
    )

    assert len(responses) == 1
    response = responses[0]

    self.etp_client.assert_response(response, GetDataArraysResponse)
    assert (
        len(response.data_arrays) == 1
        and dai.path_in_resource in response.data_arrays
    )

    return response.data_arrays[dai.path_in_resource].to_numpy_array()

upload_model async

upload_model(
    dataspace_uri: str | DataspaceURI,
    ml_objects: Sequence[AbstractCitedDataObject],
    data_arrays: Mapping[str, Sequence[NDArray[LogicalArrayDTypes]]] = {},
    handle_transaction: bool = True,
    debounce: bool | float = False,
) -> list[str]

The main driver method for uploading data to an ETP server. This method takes in a dataspace uri (for uploading to multiple dataspaces you need to call RDDMSClient.upload_model multiple times), a set of RESQML-objects, and a mapping of data arrays that are indexed by their path in resource (which is found in the RESQML-objects as well).

PARAMETER DESCRIPTION
dataspace_uri

An ETP dataspace uri.

TYPE: str | DataspaceURI

ml_objects

A sequence of RESQML v2.0.1-objects.

TYPE: Sequence[AbstractCitedDataObject]

data_arrays

A mapping, e.g., a dictionary, of data arrays where the path in resources (found in the RESQML-objects) are the keys. Default is {}, meaning that only the RESQML-objects will be uploaded.

TYPE: Mapping[str, Sequence[NDArray[LogicalArrayDTypes]]] DEFAULT: {}

handle_transaction

A flag to toggle if RDDMSClient.upload_model should start and commit the transaction towards the dataspace. Default is True, and the method will ensure that the transaction handling is done correctly.

TYPE: bool DEFAULT: True

debounce

Parameter to decide if RDDMSClient.upload_model should retry starting a transaction if it initially fails. See RDDMSClient.start_transaction for a more in-depth explanation of the parameter. Default is False, i.e., no debouncing will occur and the method will fail if it is unable to start a transaction.

TYPE: bool | float DEFAULT: False

RETURNS DESCRIPTION
list[str]

A list of ETP data object uris to the uploaded objects.

See Also

RDDMSClient.download_models: The reverse operation.

RDDMSClient.start_transaction: Method for setting up a transaction. It is only necessary to interact with this method if handle_transaction=False.

RDDMSClient.commit_transaction: Method for committing a transaction. It is only necessary to interact with this method if handle_transaction=False.

Source code in src/rddms_io/client.py
async def upload_model(
    self,
    dataspace_uri: str | DataspaceURI,
    ml_objects: Sequence[ro.AbstractCitedDataObject],
    data_arrays: typing.Mapping[
        str, Sequence[npt.NDArray[utils_arrays.LogicalArrayDTypes]]
    ] = {},
    handle_transaction: bool = True,
    debounce: bool | float = False,
) -> list[str]:
    """
    The main driver method for uploading data to an ETP server. This method
    takes in a dataspace uri (for uploading to multiple dataspaces you need
    to call `RDDMSClient.upload_model` multiple times), a set of
    RESQML-objects, and a mapping of data arrays that are indexed by their
    path in resource (which is found in the RESQML-objects as well).

    Parameters
    ----------
    dataspace_uri
        An ETP dataspace uri.
    ml_objects
        A sequence of RESQML v2.0.1-objects.
    data_arrays
        A mapping, e.g., a dictionary, of data arrays where the path in
        resources (found in the RESQML-objects) are the keys. Default is
        `{}`, meaning that only the RESQML-objects will be uploaded.
    handle_transaction
        A flag to toggle if `RDDMSClient.upload_model` should start and
        commit the transaction towards the dataspace. Default is `True`,
        and the method will ensure that the transaction handling is done
        correctly.
    debounce
        Parameter to decide if `RDDMSClient.upload_model` should retry
        starting a transaction if it initially fails. See
        `RDDMSClient.start_transaction` for a more in-depth explanation of
        the parameter. Default is `False`, i.e., no debouncing will occur
        and the method will fail if it is unable to start a transaction.

    Returns
    -------
    list[str]
        A list of ETP data object uris to the uploaded objects.

    See Also
    --------
    [`RDDMSClient.download_models`][rddms_io.client.RDDMSClient.download_models]:
        The reverse operation.

    [`RDDMSClient.start_transaction`][rddms_io.client.RDDMSClient.start_transaction]:
        Method for setting up a transaction. It is only necessary to
        interact with this method if `handle_transaction=False`.

    [`RDDMSClient.commit_transaction`][rddms_io.client.RDDMSClient.commit_transaction]:
        Method for committing a transaction. It is only necessary to
        interact with this method if `handle_transaction=False`.
    """
    if not ml_objects:
        return []

    dataspace_uri = str(DataspaceURI.from_any_etp_uri(dataspace_uri))

    if handle_transaction:
        transaction_uuid = await self.start_transaction(
            dataspace_uri=dataspace_uri,
            read_only=False,
            debounce=debounce,
        )

    ml_uris = await self._upload_model(
        dataspace_uri,
        ml_objects=ml_objects,
        data_arrays=data_arrays,
    )

    if handle_transaction:
        await self.commit_transaction(transaction_uuid=transaction_uuid)

    return ml_uris

download_models async

download_models(
    ml_uris: list[str | DataObjectURI],
    download_arrays: bool = False,
    download_linked_objects: bool = False,
) -> list[RDDMSModel]

Download RESQML-models from the RDDMS server. A model in this sense is a RESQML-object (with a given uri) and possibly with any connected arrays and referenced objects.

PARAMETER DESCRIPTION
ml_uris

A list of ETP data object uris.

TYPE: list[str | DataObjectURI]

download_arrays

A flag to toggle if any referenced arrays should be download alongside the RESQML-objects. Setting to True will populate RDDMSModel.arrays-field with a dictionary with the path_in_hdf_file as the key, and the arrays as the values. If the flag is set to False no arrays will be downloaded, and the corresponding field will be empty. Default is False.

TYPE: bool DEFAULT: False

download_linked_objects

Flag to toggle if linked objects (target-objects), i.e., objects referenced by objects from ml_uris. For example, setting the flag to True and passing in a single obj_Grid2dRepresentation-uri in the ml_uris will try to download any linked coordinate systems or any other referenced objects. The linked objects will be added to RDDMSModel.linked_models, along with arrays if download_arrays=True. Note that if any of the linked objects are already in ml_uris they will be included both as a top-level RDDMSModel, and as a linked-model under a model that references it. The method only looks for objects linked one level down (corresponding to depth = 1 in GetResources), and it will ignore obj_EpcExternalPartReference- and EpcExternalPartReference-objects. Default is False meaning no linked objects will be downloaded.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
list[RDDMSModel]

A list of RDDMSModel-objects.

Source code in src/rddms_io/client.py
async def download_models(
    self,
    ml_uris: list[str | DataObjectURI],
    download_arrays: bool = False,
    download_linked_objects: bool = False,
) -> list[RDDMSModel]:
    """
    Download RESQML-models from the RDDMS server.
    A model in this sense is a RESQML-object (with a given uri) and
    possibly with any connected arrays and referenced objects.

    Parameters
    ----------
    ml_uris
        A list of ETP data object uris.
    download_arrays
        A flag to toggle if any referenced arrays should be download
        alongside the RESQML-objects. Setting to `True` will populate
        [`RDDMSModel.arrays`][rddms_io.data_types.RDDMSModel.arrays]-field
        with a dictionary with the `path_in_hdf_file` as the key, and the
        arrays as the values. If the flag is set to `False` no arrays will
        be downloaded, and the corresponding field will be empty. Default
        is `False`.
    download_linked_objects
        Flag to toggle if linked objects (target-objects), i.e., objects
        referenced by objects from `ml_uris`. For example, setting the flag
        to `True` and passing in a single `obj_Grid2dRepresentation`-uri in
        the `ml_uris` will try to download any linked coordinate systems or
        any other referenced objects. The linked objects will be added to
        [`RDDMSModel.linked_models`][rddms_io.data_types.RDDMSModel.linked_models],
        along with arrays if `download_arrays=True`.
        Note that if any of the linked objects are already in `ml_uris`
        they will be included both as a top-level `RDDMSModel`, and as a
        linked-model under a model that references it. The method only
        looks for objects linked one level down (corresponding to `depth =
        1` in `GetResources`), and it will ignore
        `obj_EpcExternalPartReference`- and
        `EpcExternalPartReference`-objects.  Default is `False` meaning no
        linked objects will be downloaded.

    Returns
    -------
    list[RDDMSModel]
        A list of [`RDDMSModel`][rddms_io.data_types.RDDMSModel]-objects.
    """
    if len(ml_uris) == 0:
        raise ValueError("No uris in input 'ml_uris'")

    return await asyncio.gather(
        *[
            self._download_model(
                ml_uri=ml_uri,
                download_arrays=download_arrays,
                download_linked_objects=download_linked_objects,
            )
            for ml_uri in ml_uris
        ]
    )

delete_model async

delete_model(
    ml_uris: list[str | DataObjectURI],
    prune_contained_objects: bool = False,
    handle_transaction: bool = True,
    debounce: bool | float = False,
) -> None

Method used for deleting a set of objects on an ETP server. In order for the deletion to be successful the objects to be deleted can not leave any dangling source-objects. That is, there can be no objects left on the ETP server that references the deleted objects.

PARAMETER DESCRIPTION
ml_uris

A list of ETP data object uris to delete.

TYPE: list[str | DataObjectURI]

prune_contained_objects

See section 9.3.4 in the ETP v1.2 standards documentation for an accurate description of this parameter. Default is False meaning no pruning is done.

TYPE: bool DEFAULT: False

handle_transaction

A flag to toggle if RDDMSClient.delete_model should start and commit the transaction towards the dataspace. Default is True, and the method will ensure that the transaction handling is done correctly.

TYPE: bool DEFAULT: True

debounce

Parameter to decide if RDDMSClient.delete_model should retry starting a transaction if it initially fails. See RDDMSClient.start_transaction for a more in-depth explanation of the parameter. Default is False, i.e., no debouncing will occur and the method will fail if it is unable to start a transaction.

TYPE: bool | float DEFAULT: False

Source code in src/rddms_io/client.py
async def delete_model(
    self,
    ml_uris: list[str | DataObjectURI],
    prune_contained_objects: bool = False,
    handle_transaction: bool = True,
    debounce: bool | float = False,
) -> None:
    """
    Method used for deleting a set of objects on an ETP server. In order
    for the deletion to be successful the objects to be deleted can not
    leave any dangling source-objects. That is, there can be no objects
    left on the ETP server that references the deleted objects.

    Parameters
    ----------
    ml_uris
        A list of ETP data object uris to delete.
    prune_contained_objects
        See section 9.3.4 in the ETP v1.2 standards documentation for an
        accurate description of this parameter. Default is `False` meaning
        no pruning is done.
    handle_transaction
        A flag to toggle if `RDDMSClient.delete_model` should start and
        commit the transaction towards the dataspace. Default is `True`,
        and the method will ensure that the transaction handling is done
        correctly.
    debounce
        Parameter to decide if `RDDMSClient.delete_model` should retry
        starting a transaction if it initially fails. See
        `RDDMSClient.start_transaction` for a more in-depth explanation of
        the parameter. Default is `False`, i.e., no debouncing will occur
        and the method will fail if it is unable to start a transaction.
    """
    if not ml_uris:
        return

    uris = list(map(str, ml_uris))
    ddo = DeleteDataObjects(
        uris=dict(zip(uris, uris)),
        prune_contained_objects=prune_contained_objects,
    )

    if handle_transaction:
        dataspace_uris = [str(DataspaceURI.from_any_etp_uri(u)) for u in ml_uris]
        assert all([dataspace_uris[0] == du for du in dataspace_uris])
        transaction_uuid = await self.start_transaction(
            dataspace_uri=dataspace_uris[0], read_only=False, debounce=debounce
        )

    responses = await self.etp_client.send_and_recv(ddo)

    parse_and_raise_response_errors(
        responses,
        DeleteDataObjectsResponse,
        "RDDMSClient.delete_model",
    )

    if handle_transaction:
        await self.commit_transaction(transaction_uuid)

Synchronous client implementation

The synchronous client RDDMSClientSync is a thin wrapper around RDDMSClient. The main difference is that every method on the synchronous client will set up a connection towards the server, call the relevant method, and then tear down the connection. If you find yourself running multiple methods after one another, it might be beneficial to use RDDMSClient instead.

rddms_io.sync_client.RDDMSClientSync

RDDMSClientSync(
    uri: str,
    data_partition_id: str | None = None,
    authorization: str | SecretStr | None = None,
    etp_timeout: float | None = None,
    max_message_size: float = 2**20,
    use_compression: bool = True,
)

Synchronized version of the RDDMSClient. The purpose of this client is to serve the same high-level endpoints towards the RDDMS server as RDDMSClient, but without the need to use async and await. Only the methods (and parameters) that can be wrapped in a single call are included in RDDMSClientSync. The client works by passing in the same parameters as rddms_connect to the constructor, and then calling the methods without using await. The parameters to RDDMSClientSync are the same as to rddms_connect.

PARAMETER DESCRIPTION
uri

The uri to the RDDMS server. This should be the uri to a websockets endpoint to an ETP server.

TYPE: str

data_partition_id

The data partition id used when connecting to the OSDU open-etp-server in multi-partition mode. Default is None.

TYPE: str | None DEFAULT: None

authorization

Bearer token used for authenticating to the RDDMS server. This token should be on the form "Bearer 1234...". Default is None.

TYPE: str | SecretStr | None DEFAULT: None

etp_timeout

The timeout in seconds for when to stop waiting for a message from the server. Setting it to None will persist the connection indefinetly. Default is None.

TYPE: float | None DEFAULT: None

max_message_size

The maximum number of bytes for a single websockets message. Default is 2**20 corresponding to 1 MiB.

TYPE: float DEFAULT: 2 ** 20

use_compression

Flag to toggle if compression of the messages should be applied. So far the client (and the server) only supports compression with gzip. Default is True and compression is applied.

TYPE: bool DEFAULT: True

Notes

The authorization-token (if using, e.g., msal` will have an expiration time. If this expiration time is met, the client needs to be updated with a fresh token.

Whenever you call one of the methods of this client, it will set up a new ETP session, call the relevant method from the asynchronous client, and then tear down the connection. If you find yourself repeatedly calling multiple methods in a succesive fashion, consider switching to the asynchronous client as this will be much faster.

See Also

RDDMSClient: The asynchronous driver class which RDDMSClientSync wraps.

rddms_connect: The connection class to set up the asynchronous RDDMSClient.

Source code in src/rddms_io/sync_client.py
def __init__(
    self,
    uri: str,
    data_partition_id: str | None = None,
    authorization: str | SecretStr | None = None,
    etp_timeout: float | None = None,
    max_message_size: float = 2**20,
    use_compression: bool = True,
) -> None:
    if isinstance(authorization, SecretStr):
        authorization = authorization
    else:
        authorization = SecretStr(authorization)

    self.connection_args = dict(
        uri=uri,
        data_partition_id=data_partition_id,
        authorization=authorization,
        etp_timeout=etp_timeout,
        max_message_size=max_message_size,
        use_compression=use_compression,
    )

connection_args instance-attribute

connection_args = dict(
    uri=uri,
    data_partition_id=data_partition_id,
    authorization=authorization,
    etp_timeout=etp_timeout,
    max_message_size=max_message_size,
    use_compression=use_compression,
)

create_dataspace

create_dataspace(
    dataspace_uri: str | DataspaceURI,
    legal_tags: list[str] = [],
    other_relevant_data_countries: list[str] = [],
    owners: list[str] = [],
    viewers: list[str] = [],
    ignore_if_exists: bool = False,
) -> None

Method creating a new dataspace on the ETP server. This function is limited to creating a single dataspace with optional access-control list (ACL) information.

PARAMETER DESCRIPTION
dataspace_uri

The ETP dataspace uri, or path, to create. If it is a dataspace path (on the form 'foo/bar') it will be converted to the dataspace uri "eml:///dataspace('foo/bar')".

TYPE: str | DataspaceURI

legal_tags

List of legal tag strings for the ACL. The default is an empty list.

TYPE: list[str] DEFAULT: []

other_relevant_data_countries

List of data countries for the ACL. The default is an empty list.

TYPE: list[str] DEFAULT: []

owners

List of owners ACL. The default is an empty list.

TYPE: list[str] DEFAULT: []

viewers

List of viewers ACL. The default is an empty list.

TYPE: list[str] DEFAULT: []

ignore_if_exists

When True the method silently ignores any ETPError with error code 5 (EINVALID_ARGUMENT). This error occurs if the dataspace already exists on the server. Otherwise, all errors are raised. Default is False.

TYPE: bool DEFAULT: False

See Also

RDDMSClient.create_dataspace: The asynchronous counterpart.

Source code in src/rddms_io/sync_client.py
def create_dataspace(
    self,
    dataspace_uri: str | DataspaceURI,
    legal_tags: list[str] = [],
    other_relevant_data_countries: list[str] = [],
    owners: list[str] = [],
    viewers: list[str] = [],
    ignore_if_exists: bool = False,
) -> None:
    """
    Method creating a new dataspace on the ETP server. This function is
    limited to creating a single dataspace with optional access-control
    list (ACL) information.

    Parameters
    ----------
    dataspace_uri
        The ETP dataspace uri, or path, to create. If it is a dataspace
        path (on the form `'foo/bar'`) it will be converted to the
        dataspace uri `"eml:///dataspace('foo/bar')"`.
    legal_tags
        List of legal tag strings for the ACL. The default is an empty
        list.
    other_relevant_data_countries: list[str]
        List of data countries for the ACL. The default is an empty list.
    owners
        List of owners ACL. The default is an empty list.
    viewers
        List of viewers ACL. The default is an empty list.
    ignore_if_exists
        When `True` the method silently ignores any `ETPError` with error
        code `5` (`EINVALID_ARGUMENT`). This error occurs if the dataspace
        already exists on the server. Otherwise, all errors are raised.
        Default is `False`.

    See Also
    --------
    [`RDDMSClient.create_dataspace`][rddms_io.client.RDDMSClient.create_dataspace]:
        The asynchronous counterpart.
    """

    async def create_dataspace() -> None:
        async with rddms_connect(**self.connection_args) as rddms_client:
            return await rddms_client.create_dataspace(
                dataspace_uri=dataspace_uri,
                legal_tags=legal_tags,
                other_relevant_data_countries=other_relevant_data_countries,
                owners=owners,
                viewers=viewers,
                ignore_if_exists=ignore_if_exists,
            )

    return asyncio.run(create_dataspace())

delete_dataspace

delete_dataspace(dataspace_uri: DataspaceURI | str) -> None

Method deleting a dataspace.

PARAMETER DESCRIPTION
dataspace_uri

The ETP dataspace uri, or path, for the dataspace to delete. If it is a dataspace path (on the form 'foo/bar') it will be converted to the dataspace uri "eml:///dataspace('foo/bar')".

TYPE: DataspaceURI | str

See Also

RDDMSClient.delete_dataspace: The asynchronous version of this method.

Source code in src/rddms_io/sync_client.py
def delete_dataspace(self, dataspace_uri: DataspaceURI | str) -> None:
    """
    Method deleting a dataspace.

    Parameters
    ----------
    dataspace_uri
        The ETP dataspace uri, or path, for the dataspace to delete. If it
        is a dataspace path (on the form `'foo/bar'`) it will be converted
        to the dataspace uri `"eml:///dataspace('foo/bar')"`.

    See Also
    --------
    [`RDDMSClient.delete_dataspace`][rddms_io.client.RDDMSClient.delete_dataspace]:
        The asynchronous version of this method.
    """

    async def delete_dataspace() -> None:
        async with rddms_connect(**self.connection_args) as rddms_client:
            return await rddms_client.delete_dataspace(dataspace_uri=dataspace_uri)

    return asyncio.run(delete_dataspace())

list_dataspaces

list_dataspaces(
    store_last_write_filter: datetime | int | None = None,
) -> list[Dataspace]

Method used to list all dataspaces on the ETP-server.

PARAMETER DESCRIPTION
store_last_write_filter

A parameter that can be used to limit the results to only include dataspaces that were written to after the time specified in the filter. The default is None, meaning all dataspaces will be included.

TYPE: datetime | int | None DEFAULT: None

RETURNS DESCRIPTION
list[Dataspace]

A list of ETP Dataspace-data objects. See section 23.43.10 of the ETP v1.2 standards documentation for an accurate description of the different fields.

See Also

RDDMSClient.list_dataspaces: The asynchronous version of this method.

Source code in src/rddms_io/sync_client.py
def list_dataspaces(
    self, store_last_write_filter: datetime.datetime | int | None = None
) -> list[Dataspace]:
    """
    Method used to list all dataspaces on the ETP-server.

    Parameters
    ----------
    store_last_write_filter
        A parameter that can be used to limit the results to only include
        dataspaces that were written to after the time specified in the
        filter. The default is `None`, meaning all dataspaces will be
        included.

    Returns
    -------
    list[Dataspace]
        A list of ETP `Dataspace`-data objects. See section 23.43.10 of the
        ETP v1.2 standards documentation for an accurate description of the
        different fields.

    See Also
    --------
    [`RDDMSClient.list_dataspaces`][rddms_io.client.RDDMSClient.list_dataspaces]:
        The asynchronous version of this method.
    """

    async def list_dataspaces() -> None:
        async with rddms_connect(**self.connection_args) as rddms_client:
            return await rddms_client.list_dataspaces(
                store_last_write_filter=store_last_write_filter
            )

    return asyncio.run(list_dataspaces())

list_objects_under_dataspace

list_objects_under_dataspace(
    dataspace_uri: DataspaceURI | str,
    data_object_types: list[str | Type[AbstractCitedDataObject]] = [],
    count_objects: bool = True,
    store_last_write_filter: int | None = None,
) -> list[Resource]

This method will list all objects under a given dataspace.

PARAMETER DESCRIPTION
dataspace_uri

The uri of the dataspace to list objects.

TYPE: DataspaceURI | str

data_object_types

Object types to look for. This can either be a list of strings, e.g., ["eml20.*", "resqml20.obj_Grid2dRepresentation"] to query all Energistic Common version 2.0-objects and obj_Grid2dRepresentation-objects from RESQML v2.0.1, or it can be a list of objects from resqml_objects.v201, e.g., [ro.obj_Grid2dRepresentation, ro.obj_LocalDepth3dCrs]. Default is [], i.e., an empty list which means that all data object types will be returned.

TYPE: list[str | Type[AbstractCitedDataObject]] DEFAULT: []

count_objects

Toggle if the number of target and source objects should be counted. Default is True.

TYPE: bool DEFAULT: True

store_last_write_filter

Filter to only include objects that are written after the provided datetime or timestamp. Default is None, meaning no filter is applied. Note that the timestamp should be in microsecond resolution.

TYPE: int | None DEFAULT: None

RETURNS DESCRIPTION
list[Resource]

A list of Resource-objects.

See Also

RDDMSClient.list_objects_under_dataspace: The asynchronous version of this method.

Source code in src/rddms_io/sync_client.py
def list_objects_under_dataspace(
    self,
    dataspace_uri: DataspaceURI | str,
    data_object_types: list[str | typing.Type[ro.AbstractCitedDataObject]] = [],
    count_objects: bool = True,
    store_last_write_filter: int | None = None,
) -> list[Resource]:
    """
    This method will list all objects under a given dataspace.

    Parameters
    ----------
    dataspace_uri
        The uri of the dataspace to list objects.
    data_object_types
        Object types to look for. This can either be a list of strings,
        e.g., `["eml20.*", "resqml20.obj_Grid2dRepresentation"]` to query
        all Energistic Common version 2.0-objects and
        `obj_Grid2dRepresentation`-objects from RESQML v2.0.1, or it can be
        a list of objects from `resqml_objects.v201`, e.g.,
        `[ro.obj_Grid2dRepresentation, ro.obj_LocalDepth3dCrs]`. Default is
        `[]`, i.e., an empty list which means that all data object types
        will be returned.
    count_objects
        Toggle if the number of target and source objects should be
        counted. Default is `True`.
    store_last_write_filter
        Filter to only include objects that are written after the provided
        datetime or timestamp. Default is `None`, meaning no filter is
        applied. Note that the timestamp should be in microsecond
        resolution.

    Returns
    -------
    list[Resource]
        A list of
        [`Resource`][energistics.etp.v12.datatypes.object.Resource]-objects.

    See Also
    --------
    [`RDDMSClient.list_objects_under_dataspace`][rddms_io.client.RDDMSClient.list_objects_under_dataspace]:
        The asynchronous version of this method.
    """

    async def list_objects_under_dataspace() -> list[Resource]:
        async with rddms_connect(**self.connection_args) as rddms_client:
            return await rddms_client.list_objects_under_dataspace(
                dataspace_uri=dataspace_uri,
                data_object_types=data_object_types,
                count_objects=count_objects,
                store_last_write_filter=store_last_write_filter,
            )

    return asyncio.run(list_objects_under_dataspace())

list_linked_objects

list_linked_objects(
    start_uri: DataObjectURI | str,
    data_object_types: list[str | Type[AbstractCitedDataObject]] = [],
    store_last_write_filter: datetime | int | None = None,
    depth: int = 1,
) -> LinkedObjects

Method listing all objects that are linked to the provided object uri. That is, starting from the object indexed by the uri start_uri it finds all objects (sources) that links to it, and all objects (targets) it links to.

PARAMETER DESCRIPTION
start_uri

An ETP data object uri to start the query from.

TYPE: DataObjectURI | str

data_object_types

A filter to limit which types of objects to include in the results. As a string it is on the form eml20.obj_EpcExternalPartReference for a specific object, or eml20.* for all Energistics Common objects. For the RESQML v2.0.1 objects it is similarlarly resqml20.*, or a specific type instead of the wildcard *. This can also be classes from resqml_objects.v201, in which case the filter will be constructed. Default is [], meaning no filter is applied.

TYPE: list[str | Type[AbstractCitedDataObject]] DEFAULT: []

store_last_write_filter

Filter to only include objects that are written after the provided datetime or timestamp. Default is None, meaning no filter is applied. Note that the timestamp should be in microsecond resolution.

TYPE: datetime | int | None DEFAULT: None

depth

The number of links to return. Setting depth = 1 will only return targets and sources that are directly linked to the start object. With depth = 2 we get links to objects that linkes to the targets and sources of the start object. Default is 1.

TYPE: int DEFAULT: 1

RETURNS DESCRIPTION
LinkedObjects

A container (NamedTuple) with resources and edges for the sources and targets of the start-object.

See Also

RDDMSClient.list_linked_objects: The asynchronous version of this method.

Source code in src/rddms_io/sync_client.py
def list_linked_objects(
    self,
    start_uri: DataObjectURI | str,
    data_object_types: list[str | typing.Type[ro.AbstractCitedDataObject]] = [],
    store_last_write_filter: datetime.datetime | int | None = None,
    depth: int = 1,
) -> LinkedObjects:
    """
    Method listing all objects that are linked to the provided object uri.
    That is, starting from the object indexed by the uri `start_uri` it
    finds all objects (sources) that links to it, and all objects (targets)
    it links to.

    Parameters
    ----------
    start_uri: DataObjectURI | str
        An ETP data object uri to start the query from.
    data_object_types: list[str | typing.Type[ro.AbstractCitedDataObject]]
        A filter to limit which types of objects to include in the results.
        As a string it is on the form `eml20.obj_EpcExternalPartReference`
        for a specific object, or `eml20.*` for all Energistics Common
        objects. For the RESQML v2.0.1 objects it is similarlarly
        `resqml20.*`, or a specific type instead of the wildcard `*`. This
        can also be classes from `resqml_objects.v201`, in which case the
        filter will be constructed. Default is `[]`, meaning no filter is
        applied.
    store_last_write_filter: datetime.datetime | int | None
        Filter to only include objects that are written after the provided
        datetime or timestamp. Default is `None`, meaning no filter is
        applied. Note that the timestamp should be in microsecond
        resolution.
    depth: int
        The number of links to return. Setting `depth = 1` will only return
        targets and sources that are directly linked to the start object.
        With `depth = 2` we get links to objects that linkes to the targets
        and sources of the start object. Default is `1`.

    Returns
    -------
    LinkedObjects
        A container (`NamedTuple`) with resources and edges for the sources
        and targets of the start-object.

    See Also
    --------
    [`RDDMSClient.list_linked_objects`][rddms_io.client.RDDMSClient.list_linked_objects]:
        The asynchronous version of this method.
    """

    async def list_linked_objects() -> LinkedObjects:
        async with rddms_connect(**self.connection_args) as rddms_client:
            return await rddms_client.list_linked_objects(
                start_uri=start_uri,
                data_object_types=data_object_types,
                store_last_write_filter=store_last_write_filter,
                depth=depth,
            )

    return asyncio.run(list_linked_objects())

list_array_metadata

list_array_metadata(
    ml_uris: list[str | DataObjectURI],
) -> dict[str, dict[str, DataArrayMetadata]]

Method used for listing array metadata for all connected arrays to the provided data object uris. This method downloads the data objects from the uris, and calls RDDMSClient.list_object_array_metadata to get the actual metadata. If the objects have already been downloaded, then using RDDMSClientSync.list_object_array_metadata will be more efficient.

The purpose of this method is to provide a more convenient way of exploring an RDDMS server without needing to handle data objects. It is recommended to use RDDMSClientSync.list_object_array_metadata if the objects have already been downloaded.

PARAMETER DESCRIPTION
ml_uris

A list of ETP data object uris.

TYPE: list[str | DataObjectURI]

RETURNS DESCRIPTION
dict[str, dict[str, DataArrayMetadata]]

A dictionary indexed by the data object uri, containing a new dictionary with the path in resource as the key and the metadata (the ETP datatype DataArrayMetadata) as the value. Note that if there is no array connected to a data object uri, there will be no entry in the returned dict for this uri.

See Also

RDDMSClient.list_array_metadata: The asynchronous version of this method.

RDDMSClientSync.list_object_array_metadata: A similar method that fetches the metadata from the objects themselves along with a dataspace uri. It is recommended to use list_object_array_metadata if you already have the objects in memory.

Source code in src/rddms_io/sync_client.py
def list_array_metadata(
    self,
    ml_uris: list[str | DataObjectURI],
) -> dict[str, dict[str, DataArrayMetadata]]:
    """
    Method used for listing array metadata for all connected arrays to the
    provided data object uris. This method downloads the data objects from
    the uris, and calls `RDDMSClient.list_object_array_metadata` to get the
    actual metadata. If the objects have already been downloaded, then
    using `RDDMSClientSync.list_object_array_metadata` will be more
    efficient.

    The purpose of this method is to provide a more convenient way of
    exploring an RDDMS server without needing to handle data objects. It is
    recommended to use `RDDMSClientSync.list_object_array_metadata` if the
    objects have already been downloaded.

    Parameters
    ----------
    ml_uris
        A list of ETP data object uris.

    Returns
    -------
    dict[str, dict[str, DataArrayMetadata]]
        A dictionary indexed by the data object uri, containing a new
        dictionary with the path in resource as the key and the metadata
        (the ETP datatype `DataArrayMetadata`) as the value. Note that if
        there is no array connected to a data object uri, there will be no
        entry in the returned dict for this uri.

    See Also
    --------
    [`RDDMSClient.list_array_metadata`][rddms_io.client.RDDMSClient.list_array_metadata]:
        The asynchronous version of this method.

    [`RDDMSClientSync.list_object_array_metadata`][rddms_io.sync_client.RDDMSClientSync.list_object_array_metadata]:
        A similar method that fetches the metadata from the objects
        themselves along with a dataspace uri. It is recommended to use
        `list_object_array_metadata` if you already have the objects in
        memory.
    """

    async def list_array_metadata() -> dict[str, dict[str, DataArrayMetadata]]:
        async with rddms_connect(**self.connection_args) as rddms_client:
            return await rddms_client.list_array_metadata(ml_uris=ml_uris)

    return asyncio.run(list_array_metadata())

list_object_array_metadata

list_object_array_metadata(
    dataspace_uri: str | DataspaceURI,
    ml_objects: Sequence[AbstractCitedDataObject],
) -> dict[str, dict[str, DataArrayMetadata]]

Method used for listing array metadata for all connected arrays to the provided RESQML-objects. This method works by taking in a dataspace uri and the objects themselves (instead of their uris) as they would need to be downloaded to look up which arrays they link to.

PARAMETER DESCRIPTION
dataspace_uri

The ETP dataspace uri where the objects are located.

TYPE: str | DataspaceURI

ml_objects

A list (or any sequence) of objects that links to arrays.

TYPE: Sequence[AbstractCitedDataObject]

RETURNS DESCRIPTION
dict[str, dict[str, DataArrayMetadata]]

A dictionary indexed by the data object uri, containing a new dictionary with the path in resource as the key and the metadata (the ETP datatype DataArrayMetadata) as the value.

See Also

RDDMSClient.list_object_array_metadata: The asynchronous version of this method.

RDDMSClientSync.list_array_metadata: A similar method that looks up array metadata needing only the uris of the objects.

Source code in src/rddms_io/sync_client.py
def list_object_array_metadata(
    self,
    dataspace_uri: str | DataspaceURI,
    ml_objects: Sequence[ro.AbstractCitedDataObject],
) -> dict[str, dict[str, DataArrayMetadata]]:
    """
    Method used for listing array metadata for all connected arrays to the
    provided RESQML-objects. This method works by taking in a dataspace uri
    and the objects themselves (instead of their uris) as they would need
    to be downloaded to look up which arrays they link to.

    Parameters
    ----------
    dataspace_uri: str | DataspaceURI
        The ETP dataspace uri where the objects are located.
    ml_objects: Sequence[ro.AbstractCitedDataObject]
        A list (or any sequence) of objects that links to arrays.

    Returns
    -------
    dict[str, dict[str, DataArrayMetadata]]
        A dictionary indexed by the data object uri, containing a new
        dictionary with the path in resource as the key and the metadata
        (the ETP datatype `DataArrayMetadata`) as the value.

    See Also
    --------
    [`RDDMSClient.list_object_array_metadata`][rddms_io.client.RDDMSClient.list_object_array_metadata]:
        The asynchronous version of this method.

    [`RDDMSClientSync.list_array_metadata`][rddms_io.sync_client.RDDMSClientSync.list_array_metadata]:
        A similar method that looks up array metadata needing only the uris
        of the objects.
    """

    async def list_object_array_metadata() -> dict[
        str, dict[str, DataArrayMetadata]
    ]:
        async with rddms_connect(**self.connection_args) as rddms_client:
            return await rddms_client.list_object_array_metadata(
                dataspace_uri=dataspace_uri, ml_objects=ml_objects
            )

    return asyncio.run(list_object_array_metadata())

delete_model

delete_model(
    ml_uris: list[str | DataObjectURI],
    prune_contained_objects: bool = False,
    debounce: bool | float = False,
) -> None

Method used for deleting a set of objects on an ETP server. In order for the deletion to be successful the objects to be deleted can not leave any dangling source-objects. That is, there can be no objects left on the ETP server that references the deleted objects.

PARAMETER DESCRIPTION
ml_uris

A list of ETP data object uris to delete.

TYPE: list[str | DataObjectURI]

prune_contained_objects

See section 9.3.4 in the ETP v1.2 standards documentation for an accurate description of this parameter. Default is False meaning no pruning is done.

TYPE: bool DEFAULT: False

debounce

Parameter to decide if RDDMSClient.delete_model should retry starting a transaction if it initially fails. See RDDMSClient.start_transaction for a more in-depth explanation of the parameter. Default is False, i.e., no debouncing will occur and the method will fail if it is unable to start a transaction.

TYPE: bool | float DEFAULT: False

See Also

RDDMSClient.delete_model: The asynchronous version of this method.

Source code in src/rddms_io/sync_client.py
def delete_model(
    self,
    ml_uris: list[str | DataObjectURI],
    prune_contained_objects: bool = False,
    debounce: bool | float = False,
) -> None:
    """
    Method used for deleting a set of objects on an ETP server. In order
    for the deletion to be successful the objects to be deleted can not
    leave any dangling source-objects. That is, there can be no objects
    left on the ETP server that references the deleted objects.

    Parameters
    ----------
    ml_uris
        A list of ETP data object uris to delete.
    prune_contained_objects
        See section 9.3.4 in the ETP v1.2 standards documentation for an
        accurate description of this parameter. Default is `False` meaning
        no pruning is done.
    debounce
        Parameter to decide if `RDDMSClient.delete_model` should retry
        starting a transaction if it initially fails. See
        `RDDMSClient.start_transaction` for a more in-depth explanation of
        the parameter. Default is `False`, i.e., no debouncing will occur
        and the method will fail if it is unable to start a transaction.

    See Also
    --------
    [`RDDMSClient.delete_model`][rddms_io.client.RDDMSClient.delete_model]:
        The asynchronous version of this method.
    """

    async def delete_model() -> None:
        async with rddms_connect(**self.connection_args) as rddms_client:
            return await rddms_client.delete_model(
                ml_uris=ml_uris,
                prune_contained_objects=prune_contained_objects,
                handle_transaction=True,
                debounce=debounce,
            )

    return asyncio.run(delete_model())

upload_model

upload_model(
    dataspace_uri: str | DataspaceURI,
    ml_objects: Sequence[AbstractCitedDataObject],
    data_arrays: Mapping[str, Sequence[NDArray[LogicalArrayDTypes]]] = {},
    debounce: bool | float = False,
) -> list[str]

Method for uploading data to an ETP server. This method takes in a dataspace uri (for uploading to multiple dataspaces you need to call RDDMSClient.upload_model multiple times), a set of RESQML-objects, and a mapping of data arrays that are indexed by their path in resource (which is found in the RESQML-objects as well).

PARAMETER DESCRIPTION
dataspace_uri

An ETP dataspace uri.

TYPE: str | DataspaceURI

ml_objects

A sequence of RESQML v2.0.1-objects.

TYPE: Sequence[AbstractCitedDataObject]

data_arrays

A mapping, e.g., a dictionary, of data arrays where the path in resources (found in the RESQML-objects) are the keys. Default is {}, meaning that only the RESQML-objects will be uploaded.

TYPE: Mapping[str, Sequence[NDArray[LogicalArrayDTypes]]] DEFAULT: {}

debounce

Parameter to decide if RDDMSClient.upload_model should retry starting a transaction if it initially fails. See RDDMSClient.start_transaction for a more in-depth explanation of the parameter. Default is False, i.e., no debouncing will occur and the method will fail if it is unable to start a transaction.

TYPE: bool | float DEFAULT: False

RETURNS DESCRIPTION
list[str]

A list of ETP data object uris to the uploaded objects.

See Also

RDDMSClient.upload_model: The asynchronous version of this method.

RDDMSClientSync.download_models: The reverse operation.

Source code in src/rddms_io/sync_client.py
def upload_model(
    self,
    dataspace_uri: str | DataspaceURI,
    ml_objects: Sequence[ro.AbstractCitedDataObject],
    data_arrays: typing.Mapping[
        str, Sequence[npt.NDArray[LogicalArrayDTypes]]
    ] = {},
    debounce: bool | float = False,
) -> list[str]:
    """
    Method for uploading data to an ETP server. This method takes in a
    dataspace uri (for uploading to multiple dataspaces you need to call
    `RDDMSClient.upload_model` multiple times), a set of RESQML-objects,
    and a mapping of data arrays that are indexed by their path in resource
    (which is found in the RESQML-objects as well).

    Parameters
    ----------
    dataspace_uri
        An ETP dataspace uri.
    ml_objects
        A sequence of RESQML v2.0.1-objects.
    data_arrays
        A mapping, e.g., a dictionary, of data arrays where the path in
        resources (found in the RESQML-objects) are the keys. Default is
        `{}`, meaning that only the RESQML-objects will be uploaded.
    debounce
        Parameter to decide if `RDDMSClient.upload_model` should retry
        starting a transaction if it initially fails. See
        `RDDMSClient.start_transaction` for a more in-depth explanation of
        the parameter. Default is `False`, i.e., no debouncing will occur
        and the method will fail if it is unable to start a transaction.

    Returns
    -------
    list[str]
        A list of ETP data object uris to the uploaded objects.

    See Also
    --------
    [`RDDMSClient.upload_model`][rddms_io.client.RDDMSClient.upload_model]:
        The asynchronous version of this method.

    [`RDDMSClientSync.download_models`][rddms_io.sync_client.RDDMSClientSync.download_models]:
        The reverse operation.
    """

    async def upload_model() -> list[str]:
        async with rddms_connect(**self.connection_args) as rddms_client:
            return await rddms_client.upload_model(
                dataspace_uri=dataspace_uri,
                ml_objects=ml_objects,
                data_arrays=data_arrays,
                handle_transaction=True,
                debounce=debounce,
            )

    return asyncio.run(upload_model())

download_models

download_models(
    ml_uris: list[str | DataObjectURI],
    download_arrays: bool = False,
    download_linked_objects: bool = False,
) -> list[RDDMSModel]

Download RESQML-models from the RDDMS server. A model in this sense is a RESQML-object (with a given uri) and possibly with any connected arrays and referenced objects.

PARAMETER DESCRIPTION
ml_uris

A list of ETP data object uris.

TYPE: list[str | DataObjectURI]

download_arrays

A flag to toggle if any referenced arrays should be download alongside the RESQML-objects. Setting to True will populate RDDMSModel.arrays-field with a dictionary with the path_in_hdf_file as the key, and the arrays as the values. If the flag is set to False no arrays will be downloaded, and the corresponding field will be empty. Default is False.

TYPE: bool DEFAULT: False

download_linked_objects

Flag to toggle if linked objects (target-objects), i.e., objects referenced by objects from ml_uris. For example, setting the flag to True and passing in a single obj_Grid2dRepresentation-uri in the ml_uris will try to download any linked coordinate systems or any other referenced objects. The linked objects will be added to RDDMSModel.linked_models, along with arrays if download_arrays=True. Note that if any of the linked objects are already in ml_uris they will be included both as a top-level RDDMSModel, and as a linked-model under a model that references it. The method only looks for objects linked one level down (corresponding to depth = 1 in GetResources), and it will ignore obj_EpcExternalPartReference- and EpcExternalPartReference-objects. Default is False meaning no linked objects will be downloaded.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
list[RDDMSModel]

A list of RDDMSModel-objects.

See Also

RDDMSClient.download_models: The asynchronous version of this method.

Source code in src/rddms_io/sync_client.py
def download_models(
    self,
    ml_uris: list[str | DataObjectURI],
    download_arrays: bool = False,
    download_linked_objects: bool = False,
) -> list[RDDMSModel]:
    """
    Download RESQML-models from the RDDMS server. A model in this sense is
    a RESQML-object (with a given uri) and possibly with any connected
    arrays and referenced objects.

    Parameters
    ----------
    ml_uris
        A list of ETP data object uris.
    download_arrays
        A flag to toggle if any referenced arrays should be download
        alongside the RESQML-objects. Setting to `True` will populate
        [`RDDMSModel.arrays`][rddms_io.data_types.RDDMSModel.arrays]-field
        with a dictionary with the `path_in_hdf_file` as the key, and the
        arrays as the values. If the flag is set to `False` no arrays will
        be downloaded, and the corresponding field will be empty. Default
        is `False`.
    download_linked_objects
        Flag to toggle if linked objects (target-objects), i.e., objects
        referenced by objects from `ml_uris`. For example, setting the flag
        to `True` and passing in a single `obj_Grid2dRepresentation`-uri in
        the `ml_uris` will try to download any linked coordinate systems or
        any other referenced objects. The linked objects will be added to
        [`RDDMSModel.linked_models`][rddms_io.data_types.RDDMSModel.linked_models],
        along with arrays if `download_arrays=True`.
        Note that if any of the linked objects are already in `ml_uris`
        they will be included both as a top-level `RDDMSModel`, and as a
        linked-model under a model that references it. The method only
        looks for objects linked one level down (corresponding to `depth =
        1` in `GetResources`), and it will ignore
        `obj_EpcExternalPartReference`- and
        `EpcExternalPartReference`-objects.  Default is `False` meaning no
        linked objects will be downloaded.

    Returns
    -------
    list[RDDMSModel]
        A list of [`RDDMSModel`][rddms_io.data_types.RDDMSModel]-objects.

    See Also
    --------
    [`RDDMSClient.download_models`][rddms_io.client.RDDMSClient.download_models]:
        The asynchronous version of this method.
    """

    async def download_models() -> None:
        async with rddms_connect(**self.connection_args) as rddms_client:
            return await rddms_client.download_models(
                ml_uris=ml_uris,
                download_arrays=download_arrays,
                download_linked_objects=download_linked_objects,
            )

    return asyncio.run(download_models())

Data types

Data types returned by RDDMSClient in various methods.

rddms_io.data_types

RDDMSModel

Bases: NamedTuple

Container for results after calling RDDMSClient.download_models.

ATTRIBUTE DESCRIPTION
obj

The main object in the model, i.e., the object that is referenced by a passed in uri in the argument ml_uris in RDDMSClient.download_models.

TYPE: AbstractCitedDataObject

arrays

A dictionary with arrays referenced by obj (if any). The keys are found in the field path_in_hdf_file of any Hdf5Dataset-objects occuring in obj.

TYPE: dict[str, list[NDArray[LogicalArrayDTypes]]]

linked_models

A list of RDDMSModel-objects where the RDDMSModel.obj-field is an object referenced by the current obj. These linked models might also contain arrays and linked models themselves.

TYPE: list[RDDMSModel]

obj instance-attribute
arrays instance-attribute
arrays: dict[str, list[NDArray[LogicalArrayDTypes]]]
linked_models instance-attribute
linked_models: list[RDDMSModel]

LinkedObjects

Bases: NamedTuple

Container for results after calling RDDMSClient.list_linked_objects. Objects in RESQML are structured as graphs. Objects can point to other objects. If object A has a reference to object B, we say that A is a source to B, but B is also a target to A.

ATTRIBUTE DESCRIPTION
start_uri

The uri of the object that we are looking for links to.

TYPE: str

self_resource

The Resource of the object identified by start_uri.

TYPE: Resource

source_resources

A list of Resource-objects that act as sources to the object referenced by start_uri.

TYPE: list[Resource]

source_edges

A list of Edge-objects that describes how the sources links to the object referenced by start_uri.

TYPE: list[Edge]

target_resources

A list of Resource-objects that act as targets to the object referenced by start_uri.

TYPE: list[Resource]

target_edges

A list of Edge-objects that describes how the targets links to the object referenced by start_uri.

TYPE: list[Edge]

start_uri instance-attribute
start_uri: str
self_resource instance-attribute
self_resource: Resource
source_resources instance-attribute
source_resources: list[Resource]
source_edges instance-attribute
source_edges: list[Edge]
target_resources instance-attribute
target_resources: list[Resource]
target_edges instance-attribute
target_edges: list[Edge]