Working with Binary Data
This guide explains how to fetch and decode binary bulk data (Protocol Buffers) from the DSIS API.
Overview
The DSIS API serves data in two formats:
- Metadata (JSON): Via OData - entity properties, relationships, statistics
- Bulk Data (Protocol Buffers): Large binary arrays like horizon z-values, log curves, seismic amplitudes, surface grids
Installation
To work with binary data, install with protobuf support:
pip install dsis-schemas[protobuf]
Note: Requires Python 3.11+ and protobuf 6.33.0+
Supported Binary Data Types
| Type | Schema | Description | Decoder |
|---|---|---|---|
| Horizon 3D | HorizonData3D |
Interpreted surface z-values | decode_horizon_data() |
| Log Curves | LogCurve |
Well log measurements | decode_log_curves() |
| Seismic 3D | SeismicDataSet3D |
3D seismic amplitude volume | decode_seismic_float_data() |
| Seismic 2D | SeismicDataSet2D |
2D seismic trace data | decode_seismic_float_data() |
| Surface Grid | SurfaceGrid |
Gridded surface data | decode_lgc_structure() |
Two Methods for Fetching Binary Data
Method 1: get_bulk_data() - Load All at Once
Use for small to medium datasets (< 100MB):
from dsis_client import DSISClient, QueryBuilder
from dsis_model_sdk.models.common import HorizonData3D
from dsis_model_sdk.protobuf import decode_horizon_data
# Query for entity
query = QueryBuilder(
model_name="OW5000",
district_id="123",
project="SNORRE",
).schema(HorizonData3D)
horizons = list(client.execute_query(query, cast=True, max_pages=1))
# Target a specific entity and fetch binary data
horizon = horizons[0]
bulk_query = query.entity(horizon.native_uid)
binary_data = client.get_bulk_data(bulk_query)
# Decode
if binary_data:
decoded = decode_horizon_data(binary_data)
Method 2: get_bulk_data_stream() - Stream in Chunks
Use for large datasets (> 100MB) to avoid memory issues:
from dsis_model_sdk.models.common import SeismicDataSet3D
from dsis_model_sdk.protobuf import decode_seismic_float_data
# Query for entity
query = QueryBuilder(
model_name="OW5000",
district_id="123",
project="SNORRE",
).schema(SeismicDataSet3D)
datasets = list(client.execute_query(query, cast=True, max_pages=1))
# Stream large dataset in chunks
seismic = datasets[0]
bulk_query = query.entity(seismic.native_uid)
chunks = []
for chunk in client.get_bulk_data_stream(
bulk_query,
chunk_size=10*1024*1024, # 10MB chunks (DSIS recommended)
stream_retries=2,
):
chunks.append(chunk)
print(f"Downloaded {len(chunk):,} bytes")
# Combine and decode
binary_data = b''.join(chunks)
decoded = decode_seismic_float_data(binary_data)
Using entity() to Target Bulk Data
Use query.entity(native_uid) to target a specific entity's binary data field. The data_field parameter defaults to "data" but can be changed for schemas that use different endpoints:
# Standard bulk data (data_field="data" by default)
bulk_query = query.entity("46075")
binary_data = client.get_bulk_data(bulk_query)
# SurfaceGrid uses $value endpoint
bulk_query = query.entity("46075", data_field="$value")
binary_data = client.get_bulk_data(bulk_query, accept="application/octet-stream")
Complete Examples
Example 1: Horizon Data
import numpy as np
from dsis_client import DSISClient, QueryBuilder
from dsis_model_sdk.models.common import HorizonData3D
from dsis_model_sdk.protobuf import decode_horizon_data
from dsis_model_sdk.utils.protobuf_decoders import horizon_to_numpy
# Query for horizons (exclude binary data field for efficiency)
query = QueryBuilder(
model_name="OW5000",
district_id="123",
project="SNORRE",
).schema(HorizonData3D).select("horizon_name,native_uid")
horizons = list(client.execute_query(query, cast=True))
# Fetch binary data for specific horizon
horizon = horizons[0]
bulk_query = query.entity(horizon.native_uid)
binary_data = client.get_bulk_data(bulk_query)
if binary_data:
# Decode protobuf
decoded = decode_horizon_data(binary_data)
# Convert to NumPy array
array, metadata = horizon_to_numpy(decoded)
print(f"Horizon: {horizon.horizon_name}")
print(f"Grid shape: {array.shape}")
print(f"Data coverage: {(~np.isnan(array)).sum() / array.size * 100:.1f}%")
# Analyze valid data
valid_data = array[~np.isnan(array)]
print(f"Depth range: {np.min(valid_data):.2f} - {np.max(valid_data):.2f}")
Example 2: Log Curves
from dsis_model_sdk.models.common import LogCurve
from dsis_model_sdk.protobuf import decode_log_curves
from dsis_model_sdk.utils.protobuf_decoders import log_curve_to_dict
# Query for log curves
query = QueryBuilder(
model_name="OW5000",
district_id="123",
project="SNORRE",
).schema(LogCurve).select("log_curve_name,native_uid")
curves = list(client.execute_query(query, max_pages=1))
# Fetch binary data
curve = curves[0]
bulk_query = query.entity(curve["native_uid"])
binary_data = client.get_bulk_data(bulk_query)
if binary_data:
# Decode
decoded = decode_log_curves(binary_data)
print(f"Curve type: {'DEPTH' if decoded.curve_type == decoded.DEPTH else 'TIME'}")
print(f"Index range: {decoded.index.start_index} to {decoded.index.start_index + decoded.index.number_of_index * decoded.index.increment}")
# Convert to dict for easier access
data = log_curve_to_dict(decoded)
for curve_name, curve_data in data['curves'].items():
print(f"Curve: {curve_name}")
print(f" Unit: {curve_data['unit']}")
print(f" Values: {len(curve_data['values'])} samples")
Example 3: Surface Grid Data
Surface grids use the LGCStructure format (Landmark Graphics Corporation tabular structure):
from io import BytesIO
from dsis_model_sdk.protobuf import decode_lgc_structure, LGCStructure_pb2
# Query for grids
query = QueryBuilder(
model_name="OpenWorksCommonModel",
district_id="123",
project="SNORRE",
).schema("SurfaceGrid").select("native_uid,grid_name")
grids = list(client.execute_query(query, cast=True, max_pages=1))
# Fetch binary data (SurfaceGrid uses $value endpoint)
grid = grids[0]
bulk_query = query.entity(grid.native_uid, data_field="$value")
data = client.get_bulk_data(bulk_query, accept="application/octet-stream")
print(f"Downloaded {len(data):,} bytes")
# LGCStructure uses varint length prefix
def read_varint(stream):
"""Read a varint length prefix from stream."""
shift = 0
result = 0
while True:
byte_data = stream.read(1)
if not byte_data:
return 0
byte = byte_data[0]
result |= (byte & 0x7F) << shift
if not (byte & 0x80):
return result
shift += 7
# Parse length-prefixed message
stream = BytesIO(data)
size = read_varint(stream)
message_data = stream.read(size)
# Decode
lgc = decode_lgc_structure(message_data)
print(f"Structure name: {lgc.structName}")
print(f"Number of elements: {len(lgc.elements)}")
# Process grid elements (columns/rows)
for i, el in enumerate(lgc.elements[:5]): # Show first 5
data_type = LGCStructure_pb2.LGCStructure.LGCElement.DataType.Name(el.dataType)
if el.dataType == LGCStructure_pb2.LGCStructure.LGCElement.DataType.FLOAT:
values = el.data_float
elif el.dataType == LGCStructure_pb2.LGCStructure.LGCElement.DataType.DOUBLE:
values = el.data_double
else:
values = []
print(f"Element {i}: '{el.elementName}', Type: {data_type}, Values: {len(values):,}")
Important Notes
Request Timeout
Both get_bulk_data() and get_bulk_data_stream() accept an optional timeout parameter to control how long each HTTP request waits:
# Set a 10-minute timeout for large binary downloads
binary_data = client.get_bulk_data(bulk_query, timeout=600)
# Separate connect and read timeouts
for chunk in client.get_bulk_data_stream(bulk_query, timeout=(5, 600)):
chunks.append(chunk)
timeout=None(default): No timeout — wait indefinitelytimeout=600: Both connect and read timeout set to 600 secondstimeout=(5, 600): Connect timeout of 5s, read timeout of 600s
For get_bulk_data_stream(), the timeout is not a cap on the full download. It applies to opening the request and to waiting for the next bytes to arrive from the stream. A large transfer can still run longer overall if data keeps arriving before the read timeout expires.
Stream Retries
get_bulk_data_stream() also accepts an optional stream_retries parameter for transient failures that happen while chunks are being read:
for chunk in client.get_bulk_data_stream(
bulk_query,
timeout=(5, 600),
stream_retries=2,
):
chunks.append(chunk)
stream_retries=0(default): No retries after a stream read failurestream_retries=N: Retry up toNtimes with incremental waits between attempts
Retries reopen the stream and resume after the bytes already yielded, so this should only be used when the endpoint is expected to return the same bytes across reconnects.
Memory Management
- Small data (< 100MB): Use
get_bulk_data()- simpler, loads everything at once - Large data (> 100MB): Use
get_bulk_data_stream()- streams in chunks, memory-efficient
API Endpoints
- Standard bulk data:
query.entity(native_uid)→/{Schema}('{native_uid}')/data - Surface grids:
query.entity(native_uid, data_field="$value")→/{Schema}('{native_uid}')/$value
Accept Header
Most endpoints use the default Accept: application/json header. SurfaceGrid/$value endpoints require accept="application/octet-stream".
Null Values
Missing or no-data values in arrays are often represented as:
-99999.0for float/double types- Check data documentation for specific sentinel values
Migration from Older Versions
Prior to version 0.5.0, there were separate get_entity_data() and get_entity_data_stream() methods. In version 0.5.0, these were replaced by get_bulk_data() and get_bulk_data_stream() with keyword arguments. Since then, the API has been further simplified to use QueryBuilder.entity():
# OLD (removed in v0.5.0):
binary_data = client.get_entity_data(horizon, schema=HorizonData3D, query=query)
# v0.5.0 (deprecated):
binary_data = client.get_bulk_data(schema=HorizonData3D, native_uid=horizon, query=query)
# NEW (current):
bulk_query = query.entity(horizon.native_uid)
binary_data = client.get_bulk_data(bulk_query)
The new pattern uses QueryBuilder to hold all context (model, district, project, schema, entity) in one place, eliminating the need for separate parameters.
See Also
- Query Builder Guide - Building OData queries
- dsis-schemas Documentation - Complete protobuf decoder reference