Skip to content

Using the RDDMS Client

In this tutorial we will upload, search, download, and delete the regular surface model from the tutorial "Setting up a regular surface".

Nomenclature

The Reservoir Domain Data Management Services (RDDMS) is a category under the OSDU data platform for working with reservoir related models. One of the constituents is the open-etp-server which is a server storing and validating RESQML and WITSML models, and communicating via the Energistics Transfer Protocol (ETP) v1.2.

We will use the expressions ETP server and RDDMS server interchangeably when refering to the open-etp-server.

Accessing the ETP server

Access to the open-etp-server depends on how and where the server is hosted. This library uses a local ETP server for testing purposes. In this case the server is open, and no authentication is needed.

Connecting to the local server

The local ETP server can be started via the Docker compose file tests/compose.yml. To start it run (assuming that current working directory is the top of the pyetp-directory):

docker compose -f tests/compose.yml up
The server is then served at ws://localhost:9100. You can check the server capabilities at http://localhost:9100/.well-known/etp-server-capabilities?GetVersion=etp12.energistics.org. There is no need for an access token nor a data partition id for the local server. See the full compose file below.
# Local ETP-server for testing of the map-api
services:
  open-etp-server:
    # image: community.opengroup.org:5555/osdu/platform/domain-data-mgmt-services/reservoir/open-etp-server/open-etp-server-v0-28-0:latest
    # image: community.opengroup.org:5555/osdu/platform/domain-data-mgmt-services/reservoir/open-etp-server/open-etp-server-release-0-28:latest
    image: community.opengroup.org:5555/osdu/platform/domain-data-mgmt-services/reservoir/open-etp-server/open-etp-server-main:latest
    # image: community.opengroup.org:5555/osdu/platform/domain-data-mgmt-services/reservoir/open-etp-server/open-etp-server-v0-28-5:latest
    environment:
      RDMS_DATA_PARTITION_MODE: "single"
      RDMS_DATA_CONNECTIVITY_MODE: "osdu"
      # Note that port, dbname, user and password are specified in the open-etp-postgres service
      POSTGRESQL_CONN_STRING: "host=open-etp-postgres port=5432 dbname=pear user=testyuser password=testypass"
    ports:
      - 9100:9002
    networks:
      - web
    depends_on:
      open-etp-postgres:
        # Wait until postgres server is ready
        condition: service_healthy
    command: [ "openETPServer", "server", "--start", "--overwrite", "--authN", "none", "--authZ", "none" ]

  open-etp-postgres:
    image: postgres
    ports:
      - 5432:5432
    environment:
      # Set postgres username, password and database name
      POSTGRES_PASSWORD: testypass
      POSTGRES_USER: testyuser
      POSTGRES_DB: pear
    healthcheck:
      # As we have set a user and database name, we need to specify this in the
      # pg_isready-command for postgres when checking to see if the database is
      # ready.
      test: [ "CMD", "pg_isready", "-U", "testyuser", "-d", "pear" ]
      interval: 10s
      timeout: 5s
      retries: 5
    command:  |
      postgres
        -c max_wal_size=10GB
    networks:
      - web

networks:
  web:

When the server is hosted alongside other OSDU Data Platform services as part of Microsoft ADME, access to the server requires a valid JSON web token (JWT) for authentication and a data-partition-id as the server runs in multiple-partition mode. See the documentation for msal on getting an access token to the ETP server when it is hosted on Azure. The data-partition-id is part of the configuration of the server and must be communicated alongside ids and potential secrets needed to get a token.

Geting access token from msal

Below follows an example on how to get an access token using the msal.PublicClientApplication-class. It is loosely based on the examples from the msal-documentation. The script assumes the existence of a .env-file located alongside it and that python-dotenv is installed (pip install python-dotenv).

import msal
from dotenv import dotenv_values

env = dotenv_values(".env")

tenant_id = env["TENANT_ID"]
client_id = env["CLIENT_ID"]
scope = env["SCOPE"]

# RDDMS url and data partition id.
etp_url = env["RDDMS_URL"]
data_partition_id = env["DATA_PARTITION_ID"]


# Get access token using the public client flow.
app = msal.PublicClientApplication(
    client_id=client_id,
    authority=f"https://login.microsoftonline.com/{tenant_id}",
)

# Ask the user to log in via the browser.
result = app.acquire_token_interactive(scopes=[scope])
if "error" in result:
    raise Exception(f"Unable to get token: {result}")

# Prepend "Bearer " to the access token.
access_token = "Bearer " + result["access_token"]

The .env-file should be on the form:

TENANT_ID = "..."
CLIENT_ID = "..."
SCOPE = "..."

RDDMS_URL = "wss://..."
DATA_PARTITION_ID = "..."

The following example will use the local ETP server, but we will make a note on where the access token and data partition id should be included.

Connecting to the ETP server with RDDMSClient

To set up a connection we recommend using the rddms_connect-class as an asynchronous context manager (see the documentation for rddms_connect on other ways of setting up a connection). This ensures that the connection is properly closed after the program leaves the context manager.

Asynchronous Python in scripts

Both RDDMSClient and ETPClient uses an asynchronous websockets-client and are intended to be used in an asynchronous fashion. However, Python does not support using await-statements directly in a script, unless it is wrapped in an async def and called via asyncio.run. This is why the following tutorial is wrapped inside an async def main()-function that is called by asyncio.run(main()) at the end.

In a Jupyter notebook, an IPython shell, or starting the Python REPL with python -m asyncio, you can avoid the wrapper function, and instead use await directly.

Uploading a regular surface to the ETP server

We start by importing the necesary libraries, and set up a regular surface.

import asyncio

import numpy as np

from rddms_io import rddms_connect
import resqml_objects.v201 as ro


z = np.random.random((101, 103))
origin = np.array([10.0, 11.0])
spacing = np.array([1.0, 0.9])
u1 = np.array([np.sqrt(3.0) / 2.0, 0.5])
u2 = np.array([-0.5, np.sqrt(3.0) / 2.0])

originator = "<name/username/email>"
epc = ro.obj_EpcExternalPartReference(
    citation=ro.Citation(title="Demo epc", originator=originator)
)
crs = ro.obj_LocalDepth3dCrs(
    citation=ro.Citation(
        title="Demo crs",
        originator=originator,
    ),
    vertical_crs=ro.VerticalCrsEpsgCode(epsg_code=6230),
    projected_crs=ro.ProjectedCrsEpsgCode(epsg_code=23031),
)
gri = ro.obj_Grid2dRepresentation.from_regular_surface(
    citation=ro.Citation(
        title="Demo grid",
        originator=originator,
    ),
    crs=crs,
    epc_external_part_reference=epc,
    shape=z.shape,
    origin=origin,
    spacing=spacing,
    unit_vec_1=u1,
    unit_vec_2=u2,
)
Next, we define an async def main-function to wrap the asynchronous code. In this example we use the local ETP server discussed in the previous section as seen in the uri variable. As such there is no data_partition_id and no access_token needed. See the previous section for what these should be set to if the server is set up in the cloud.
async def main() -> None:
    uri = "ws://localhost:9100"
    data_partition_id = None
    access_token = None
    dataspace_path = "rddms_io/demo"
The variable dataspace_path = "rddms_io/demo" corresponds to the full dataspace uri eml:///dataspace('rddms_io/demo').

ETP v1.2 dataspaces

A named dataspace in ETP v1.2 is on the form eml:///dataspace('{path}'), where {path} is a string. The open-etp-server adds two additional restrictions for the dataspace.

  1. The default dataspace eml:/// is not supported.
  2. The path is on the form project/scenario, and is limited to a single separator /.

All methods in RDDMSClient that takes in a dataspace uri will also accept a dataspace path.

Connecting to the server and creating a dataspace

We use rddms_connect as a context manager using async with to connect to the RDDMS server, viz.:

    async with rddms_connect(
        uri=uri,
        data_partition_id=data_partition_id,
        authorization=access_token,
    ) as rddms_client:
where rddms_client is an instance of RDDMSClient.

Having connected we can create our dataspace using the method RDDMSClient.create_dataspace.

        await rddms_client.create_dataspace(
            dataspace_path,
            legal_tags=["legal-tag-1", "legal-tag-2"],
            other_relevant_data_countries=["country-code"],
            owners=["owners"],
            viewers=["viewers-1", "viewers-2"],
            ignore_if_exists=True,
        )

We have included dummy values for the access control lists (ACL), but they are only needed when the server is integrated with OSDU. The flag ignore_if_exists=True ensures that the program does not crash if the dataspace already exists.

Altering a dataspace

A dataspace, once created, can not be altered directly. Instead it must be emptied, deleted, and then re-created if the ACLs have been set up incorrectly.

Uploading the surface

To upload the regular surface we call the method RDDMSClient.upload_model, and pass in the dataspace_path, a list of the three objects, and a dictionary (this can be any dict-like mapping) where the key is the path_in_hdf_file for the surface array and the array as the value.

        await rddms_client.upload_model(
            dataspace_path,
            ml_objects=[epc, crs, gri],
            data_arrays={
                gri.grid2d_patch.geometry.points.zvalues.values.path_in_hdf_file: z,
            },
        )

Multiple writers to the same dataspace

The open-etp-server has a limitation where there can only be one writer to a dataspace at the same time. This is enforced via transactions. In cases where there is contention of write-access, the optional argument debounce in RDDMSClient.upload_model can be used to have the client wait until the dataspace is free for writing instead of crashing. Either set debounce to a non-zero float-value or the boolean True. In the former case this the float-value will be the maximum number of seconds that the client will wait for a transaction before crashing. In the latter the client will not time out, and keep waiting until a transaction is available.

Searching on the ETP server

Searching using ETP can roughly be divided into two kinds, search for dataspaces and search for data objects.

The search for dataspaces will look for all available dataspaces on the server, with an optional filter based on time for when the dataspace was last written to. The method RDDMSClient.list_dataspaces applies this kind of search.

        dataspaces = await rddms_client.list_dataspaces()
An example output using rich.print on the results gives:
[
    Dataspace(
        uri="eml:///dataspace('rddms_io/demo')",
        store_last_write=1772049333806932,
        store_created=1772049333683418,
        path='rddms_io/demo',
        custom_data={
            'legaltags': DataValue(item=ArrayOfString(values=['legal-tag-1', 'legal-tag-2'])),
            'locked': DataValue(item=False),
            'otherRelevantDataCountries': DataValue(item=ArrayOfString(values=['country-code'])),
            'owners': DataValue(item=ArrayOfString(values=['owners'])),
            'read-only': DataValue(item=False),
            'size': DataValue(item='544 kB'),
            'viewers': DataValue(item=ArrayOfString(values=['viewers-1', 'viewers-2']))
        }
    )
]
The returned results is a list of Dataspace-objects. Note that the ACLs are filled into the custom_data-field as it is not apart of the ETP-standard, but added on top of the open-etp-server when running in an OSDU context. The Dataspace object is described in section 23.34.10 in the ETP v1.2 standard.

For data objects we can apply more filters and more involved patterns. To list all data objects under a dataspace use the method RDDMSClient.list_objects_under_dataspace. A useful filter to apply on this method is the data_object_types-argument. This lets you limit the results to only certain kinds of RESQML-objects (it is an empty list by default, and will return all objects). Below is an example where we search for all obj_Grid2dRepresentation-objects under our dataspace.

        gri_resources = await rddms_client.list_objects_under_dataspace(
            dataspace_path,
            data_object_types=[ro.obj_Grid2dRepresentation],
        )
Printing the gri_resources we get a description of the gri-object that we uploaded.
[
    Resource(
        uri="eml:///dataspace('rddms_io/demo')/resqml20.obj_Grid2dRepresentation(e8a58c7e-d870-43d3-87d9-81ae18d8df7b)",
        name='Demo grid',
        source_count=1,
        target_count=1,
        last_changed=1772049333565385,
        store_last_write=1772049333694683,
        store_created=1772049333694683,
        active_status=<ActiveStatusKind.ACTIVE: 'Active'>,
        alternate_uris=[],
        custom_data={'created': DataValue(item=1772049333565385), 'creator': DataValue(item='<name/username/email>')}
    )
]
The result is a list of Resource-objects. A full description of the Resource-object can be found under section 23.34.11 in the ETP v1.2 standard. Beyond the uri and name (corresponding to the title-field in the Citation-object), we note the source_count and target_count fields. In RESQML (and the other *ML-standards from Energistics) the data is laid out as a directed graph. If we have a node \(A\) pointing to a node \(B\), then we say that \(A\) acts as a source to \(B\), and \(B\) acts as a target to \(A\).

Looking back to the output from gri_resources we can then infer that the obj_Grid2dRepresentation-object points to one other object but there is also something pointing to it.

When setting up our regular surface we know that the gri-object references a local coordinate system (a obj_LocalDepth3dCrs-object), but also an obj_EpcExternalPartReference, so we would expect the target_count to be 2. However, the open-etp-server does not include the obj_EpcExternalPartReference-object in this count, which is fine as we rarely need it after it has been uploaded (we would only need it if we were to write the model to disk in the .epc-format).

Necessity of obj_EpcExternalPartReference

The purpose of obj_EpcExternalPartReference is to tell that array data is stored alongside the RESQML-objects. The typical use case is when a RESQML-model is stored to disk in the .epc-format with a corresponding .h5-file with the array data. In this case the obj_EpcExternalPartReference informs us how the array data is stored via the mime_type-field. On the open-etp-server we do not need to know how the objects and arrays are stored, so we do not read the information contained in the obj_EpcExternalPartReference-object. However, it is still necessary as an array is referenced via two keys, the data object uri of the referenced obj_EpcExternalPartReference and the path_in_hdf_file-field in the Hdf5Dataset-object. These two keys are constructed directly from the Hdf5Dataset and we do not actually need the realization of the obj_EpcExternalPartReference-object. As such, the object is only needed when uploading to the open-etp-server, but after that it is only used indirectly.

For the source_count the open-etp-server will automatically add an obj_Activity-object (and an obj_ActivityTemplate-object that is referenced by the obj_Activity-object) referencing the grid-object.

Using the method RDDMSClient.list_linked_objects we can get an overview on how an object links to its sources and targets.

        gri_lo = await rddms_client.list_linked_objects(
            start_uri=gri_resources[0].uri,
        )
In this case we fetch the grid-uri from the gri_resources from the call to RDDMSClient.list_objects_under_dataspace. From RDDMSClient.list_linked_objects we get a LinkedObjects-object with Resource-objects for the grid-object and for the sources and targets, as well as Edge-objects for the sources and targets. The Edge-objects describe the relationship between grid-object and its sources and targets. See section 23.34.13 in the ETP v1.2 standard for the full description of the Edge-object. Below we show the output of running rich.print(gri_lo).
LinkedObjects(
    start_uri="eml:///dataspace('rddms_io/demo')/resqml20.obj_Grid2dRepresentation(e8a58c7e-d870-43d3-87d9-81ae18d8df7b)",
    self_resource=Resource(
        uri="eml:///dataspace('rddms_io/demo')/resqml20.obj_Grid2dRepresentation(e8a58c7e-d870-43d3-87d9-81ae18d8df7b)",
        name='Demo grid',
        source_count=1,
        target_count=1,
        last_changed=1772049333565385,
        store_last_write=1772049333694683,
        store_created=1772049333694683,
        active_status=<ActiveStatusKind.ACTIVE: 'Active'>,
        alternate_uris=[],
        custom_data={'created': DataValue(item=1772049333565385), 'creator': DataValue(item='<name/username/email>')}
    ),
    source_resources=[
        Resource(
            uri="eml:///dataspace('rddms_io/demo')/resqml20.obj_Activity(df7f8af1-6de4-4745-a484-1db55c55bed2)",
            name='Scenario_automated_import_2026-02-25T19:55:33.803Z',
            source_count=0,
            target_count=2,
            last_changed=1772049333803000,
            store_last_write=1772049333694683,
            store_created=1772049333694683,
            active_status=<ActiveStatusKind.ACTIVE: 'Active'>,
            alternate_uris=[],
            custom_data={'created': DataValue(item=1772049333803000), 'creator': DataValue(item='osdu_user')}
        )
    ],
    source_edges=[
        Edge(
            source_uri="eml:///dataspace('rddms_io/demo')/resqml20.obj_Activity(df7f8af1-6de4-4745-a484-1db55c55bed2)",
            target_uri="eml:///dataspace('rddms_io/demo')/resqml20.obj_Grid2dRepresentation(e8a58c7e-d870-43d3-87d9-81ae18d8df7b)",
            relationship_kind=<RelationshipKind.PRIMARY: 'Primary'>,
            custom_data={'path': DataValue(item='rsq:Parameter/rsq:DataObject')}
        )
    ],
    target_resources=[
        Resource(
            uri="eml:///dataspace('rddms_io/demo')/resqml20.obj_LocalDepth3dCrs(515450e2-2f75-41ba-b1cb-3a268c7ed9ce)",
            name='Demo crs',
            source_count=1,
            target_count=0,
            last_changed=1772049333565355,
            store_last_write=1772049333694683,
            store_created=1772049333694683,
            active_status=<ActiveStatusKind.ACTIVE: 'Active'>,
            alternate_uris=[],
            custom_data={'created': DataValue(item=1772049333565355), 'creator': DataValue(item='<name/username/email>')}
        )
    ],
    target_edges=[
        Edge(
            source_uri="eml:///dataspace('rddms_io/demo')/resqml20.obj_Grid2dRepresentation(e8a58c7e-d870-43d3-87d9-81ae18d8df7b)",
            target_uri="eml:///dataspace('rddms_io/demo')/resqml20.obj_LocalDepth3dCrs(515450e2-2f75-41ba-b1cb-3a268c7ed9ce)",
            relationship_kind=<RelationshipKind.PRIMARY: 'Primary'>,
            custom_data={'path': DataValue(item='rsq:Grid2dPatch/rsq:Geometry/rsq:LocalCrs')}
        )
    ]
)
If you need to see the sources of sources, or targets of targets (or more levels), this can be adjusted with the depth-parameter in the method RDDMSClient.list_linked_objects. A depth of 1 (the default) lists only the closest sources and targets.

Downloading the surface

To download data from the ETP server we use the method RDDMSClient.download_models. This method takes in a list of data object uris to identify the objects, and will optionally download any connected arrays and any referenced objects (the method only looks for targets one level down).

        ret_models = await rddms_client.download_models(
            ml_uris=[gri_lo.start_uri],
            download_arrays=True,
            download_linked_objects=True,
        )
The returned object is an instance of RDDMSModel which contains the object itself (in the field obj), a dictionary of arrays with path_in_hdf_file as keys (under the field arrays), and a list of RDDMSModel for any linked objects (under the field linked_models). There will be one RDDMSModel per data object uri passed in to the RDDMSClient.download_models-method, in the same order as the uris.

Deleting objects and dataspaces

To finish of this example we will delete all the objects we wrote, and delete the dataspace. To do that we list all objects under the dataspace, delete these objects, and then delete the dataspace.

        all_resources = await rddms_client.list_objects_under_dataspace(
            dataspace_path,
        )
        await rddms_client.delete_model(ml_uris=[a.uri for a in all_resources])
        await rddms_client.delete_dataspace(dataspace_path)

Tip

If you only need to update an object already on the ETP server, it is enough to write a replacement object with the same uuid.

Running the script

The main-function ends by returning objects that are used as example output in this notebook.

    return dataspaces, gri_resources, gri_lo
To run the function in a script you need to call asyncio.run(main()), viz,.
dataspaces, gri_resources, gri_lo = asyncio.run(main())

Full script

The full script contains some assert-checks that are not included in the example above. They are in place to verify that the example code runs as expected, and can be ignored in other circumstances.

import asyncio

import numpy as np

from rddms_io import rddms_connect
import resqml_objects.v201 as ro


z = np.random.random((101, 103))
origin = np.array([10.0, 11.0])
spacing = np.array([1.0, 0.9])
u1 = np.array([np.sqrt(3.0) / 2.0, 0.5])
u2 = np.array([-0.5, np.sqrt(3.0) / 2.0])

originator = "<name/username/email>"
epc = ro.obj_EpcExternalPartReference(
    citation=ro.Citation(title="Demo epc", originator=originator)
)
crs = ro.obj_LocalDepth3dCrs(
    citation=ro.Citation(
        title="Demo crs",
        originator=originator,
    ),
    vertical_crs=ro.VerticalCrsEpsgCode(epsg_code=6230),
    projected_crs=ro.ProjectedCrsEpsgCode(epsg_code=23031),
)
gri = ro.obj_Grid2dRepresentation.from_regular_surface(
    citation=ro.Citation(
        title="Demo grid",
        originator=originator,
    ),
    crs=crs,
    epc_external_part_reference=epc,
    shape=z.shape,
    origin=origin,
    spacing=spacing,
    unit_vec_1=u1,
    unit_vec_2=u2,
)


async def main() -> None:
    uri = "ws://localhost:9100"
    data_partition_id = None
    access_token = None
    dataspace_path = "rddms_io/demo"

    async with rddms_connect(
        uri=uri,
        data_partition_id=data_partition_id,
        authorization=access_token,
    ) as rddms_client:
        await rddms_client.create_dataspace(
            dataspace_path,
            legal_tags=["legal-tag-1", "legal-tag-2"],
            other_relevant_data_countries=["country-code"],
            owners=["owners"],
            viewers=["viewers-1", "viewers-2"],
            ignore_if_exists=True,
        )

        await rddms_client.upload_model(
            dataspace_path,
            ml_objects=[epc, crs, gri],
            data_arrays={
                gri.grid2d_patch.geometry.points.zvalues.values.path_in_hdf_file: z,
            },
        )

        dataspaces = await rddms_client.list_dataspaces()

        gri_resources = await rddms_client.list_objects_under_dataspace(
            dataspace_path,
            data_object_types=[ro.obj_Grid2dRepresentation],
        )

        gri_lo = await rddms_client.list_linked_objects(
            start_uri=gri_resources[0].uri,
        )

        ret_models = await rddms_client.download_models(
            ml_uris=[gri_lo.start_uri],
            download_arrays=True,
            download_linked_objects=True,
        )

        assert len(ret_models) == 1

        ret_model = ret_models[0]
        ret_gri = ret_model.obj

        assert len(ret_model.linked_models) == 1

        ret_crs = ret_model.linked_models[0].obj
        ret_z = ret_model.arrays[
            ret_gri.grid2d_patch.geometry.points.zvalues.values.path_in_hdf_file
        ]

        assert ret_gri == gri
        assert ret_crs == crs
        np.testing.assert_equal(z, ret_z)

        all_resources = await rddms_client.list_objects_under_dataspace(
            dataspace_path,
        )
        await rddms_client.delete_model(ml_uris=[a.uri for a in all_resources])
        await rddms_client.delete_dataspace(dataspace_path)

    return dataspaces, gri_resources, gri_lo


dataspaces, gri_resources, gri_lo = asyncio.run(main())