ETP v1.2 Client
The class ETPClient is an asynchronous client
implementation of the Energistics Transfer Protocol (ETP) v1.2 using
websockets.
To set up a connection use etp_connect as an
awaitable, a context manager, or a generator returning instances of the
ETPClient.
This client is meant as a "raw" ETP v1.2 client.
Use RDDMSClient if you want a more high-level
interface to the ETP server.
Setting up a connection
pyetp.client.etp_connect
etp_connect(
uri: str,
data_partition_id: str | None = None,
authorization: str | SecretStr | None = None,
etp_timeout: float | None = None,
max_message_size: float = 2**20,
use_compression: bool = True,
)
Connect to an ETP server via websockets.
This class can act as:
- A context manager handling setup and tear-down of the connection.
- An asynchronous iterator which can be used to persistently retry to connect if the websockets connection drops.
- An awaitable connection that must be manually closed by the user.
See below for examples of all three cases.
| PARAMETER | DESCRIPTION |
|---|---|
uri
|
The uri to the ETP server. This should be the uri to a websockets endpoint.
TYPE:
|
data_partition_id
|
The data partition id used when connecting to the OSDU open-etp-server
in multi-partition mode. Default is
TYPE:
|
authorization
|
Bearer token used for authenticating to the ETP server. This token
should be on the form
TYPE:
|
etp_timeout
|
The timeout in seconds for when to stop waiting for a message from the
ETP server. Setting it to
TYPE:
|
max_message_size
|
The maximum number of bytes for a single websockets message. Default is
TYPE:
|
use_compression
|
Flag to toggle if compression of the messages should be applied. So far
the client (and the server) only supports compression with gzip.
Default is
TYPE:
|
Examples:
An example of connecting to the ETP server using :func:etp_connect as a
context manager is:
async with etp_connect(...) as etp_client:
...
In this case the closing message and the websockets connection is closed once the program exits the context manager.
To persist a connection if the websockets connection is dropped (for any
reason), use :func:etp_connect as an asynchronous generator, viz.:
import websockets
async for etp_client in etp_connect(...):
try:
...
except websockets.ConnectionClosed:
continue
# Include `break` to avoid re-running the whole block if the
# iteration runs without any errors.
break
Note that in this case the whole program under the try-block is re-run
from the start if the iteration completes normally, or if the websockets
connection is dropped. Therefore, make sure to include a break at the end
of the try-block (as in the example above).
The third option is to set up a connection via await and then manually
close the connection once done:
etp_client = await etp_connect(...)
...
await etp_client.close()
Source code in src/pyetp/client.py
__await__
__await__() -> ETPClient
get_additional_headers
Source code in src/pyetp/client.py
__aenter__
async
__aenter__() -> ETPClient
Source code in src/pyetp/client.py
__aexit__
async
__aiter__
async
__aiter__() -> AsyncGenerator[ETPClient]
Source code in src/pyetp/client.py
Client implementation
pyetp.client.ETPClient
Source code in src/pyetp/client.py
requested_protocols
instance-attribute
requested_protocols = [
(get_default_server_supported_protocols(rp))
for rp in [DISCOVERY, STORE, DATA_ARRAY, TRANSACTION, DATASPACE]
]
supported_data_objects
instance-attribute
supported_data_objects = [
SupportedDataObject(qualified_type="eml20.*"),
SupportedDataObject(qualified_type="resqml20."),
]
endpoint_capabilities
instance-attribute
endpoint_capabilities = {
MAX_WEB_SOCKET_MESSAGE_PAYLOAD_SIZE: DataValue(item=max_size)
}
get_default_server_supported_protocols
staticmethod
get_default_server_supported_protocols(
protocol: int | Protocol,
) -> SupportedProtocol
Source code in src/pyetp/client.py
get_server_capabilities_url
staticmethod
Source code in src/pyetp/client.py
get_message_id
send_and_recv
async
send_and_recv(
body: ETPBaseProtocolModel,
multi_part_bodies: list[ETPBaseProtocolModel] = [],
) -> list[ETPBaseProtocolModel]
Source code in src/pyetp/client.py
send
async
Source code in src/pyetp/client.py
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 | |
recv
async
Source code in src/pyetp/client.py
__receiver_loop
async
Source code in src/pyetp/client.py
__aenter__
async
Source code in src/pyetp/client.py
__aexit__
async
__aexit__(
exc_type: Type[BaseException] | None,
exc_value: BaseException | None,
traceback: TracebackType | None,
) -> None
Source code in src/pyetp/client.py
close
async
Closing method that tears down the ETP-connection via the
ETPClient.__aexit__-method, and closes the websockets connection.
This method should only be used if the user has set up a connection
via etp_client = await etp_connect(...) and will handle the closing
of the connection manually.
Source code in src/pyetp/client.py
parse_error_info
staticmethod
parse_error_info(bodies: list[ETPBaseProtocolModel]) -> list[ErrorInfo]