Skip to main content

ReStream Datastore SDK

Project description

ReStream Datastore SDK

This project provides a convenient Python SDK for interacting with the Restream API. Using this SDK, you can:

  • check the configuration of your Sites and Pads and receive updates in real time
  • get information about the current and historical stages of the Sites with additional aggregated metrics
  • retrieve raw or sampled data from your Sites and Pads
  • get a list of changes that were applied to the data, download this data, and confirm that these changes were received
  • connect to a WebSocket room associated with a specific Site or Pad and receive new data in real time

Installation and Build

Install from PyPI:

pip install restreamsolutions

Build a wheel package from sources:

# Requires Poetry to be installed
# Run from the repository root
./build.sh

Where to find the .whl file:

  • After running the script, the built artifact will be in the dist/ directory.
  • The name will look like: restreamsolutions-<version>-py3-none-any.whl.

Use in another project (local installation from the .whl file):

# From the other project's directory
pip install /full/path/to/restreamsolutions-<version>-py3-none-any.whl

Install for local development from sources (optional):

# Requires Poetry to be installed
poetry install

Setup

Set the environment variable RESTREAM_AUTH_TOKEN and assign it your authorization token obtained from ReStream. Alternatively, you can skip setting the environment variable and pass the authorization token directly to the SDK classes and methods.

import os
from restreamsolutions import Pad

os.environ["RESTREAM_AUTH_TOKEN"] = "your token"
pads = Pad.get_models()
# Or provide auth token directly to the method
pads = Pad.get_models(auth_token="your token")

Usage

You will primarily interact with two classes, Site and Pad, which provide access to all the information available through the Restream API. Please set your authorization token at the very top of your code file, as shown above. In the examples, this step is omitted for the sake of brevity.

Fetching Sites and Pads

Get a list of all models

You can retrieve a list of all Pads or Sites available to you.

By default, the SDK returns a list of class objects (Pad or Site). During serialization, all fields from the HTTP response become attributes of the class. Fields containing timestamps are automatically converted to timezone-aware Python datetime objects.

from restreamsolutions import Pad, Site

# Get a list of all pads as Pad objects
pads = Pad.get_models()
for pad in pads:
    print(f'pad id: {pad.id}, pad name: {pad.name}, simops config: {pad.simops_config}')

# Similarly, for sites
sites = Site.get_models()

Alternatively, you can get the raw response from the endpoint as a list of Python dict objects by setting as_dict=True.

# Get a list of all pads as dicts
from restreamsolutions import Pad, Site

pads_as_dict = Pad.get_models(as_dict=True)
for pad in pads_as_dict:
    print(f'pad id: {pad["id"]}, pad name: {pad["name"]}, simops config: {pad["simops_config"]}')

# Similarly, for sites
sites_as_dict = Site.get_models(as_dict=True)

Get a single model by ID

If you know the ID of a Site or Pad, you can fetch a specific object using the get_model class method.

from restreamsolutions import Pad, Site

# Get a specific Pad object by its ID
pad = Pad.get_model(id=668)

# Get a specific Site object by its ID
site = Site.get_model(id=981)

# You can also get the model as a dict
pad_as_dict = Pad.get_model(id=668, as_dict=True)
site_as_dict = Site.get_model(id=981, as_dict=True)

Working with object instances vs. dictionaries

You can instantiate a class object directly from a dictionary. The class constructor will assign all attributes and perform conversions, such as turning date strings into datetime objects.

from restreamsolutions import Site

site_as_dict = Site.get_model(id=981, as_dict=True)
site = Site(**site_as_dict)
print(f'date as datetime: {site.date_created}')
print(f'date as a string: {site_as_dict["date_created"]}')

Working with Object Relationships

Getting child objects (Sites from a Pad)

To get all Sites belonging to a Pad, you can use the get_sites() method. For efficiency, it's better to instantiate the Pad with just its ID and then call the method.

from restreamsolutions import Pad

# This approach makes two API calls: one to get the Pad, another to get its Sites.
sites_v1 = Pad.get_model(id=668).get_sites()

# This is more efficient, making only one API call to get the Sites.
sites_v2 = Pad(id=668).get_sites()

When you create an object with only an id, it won't have other attributes populated. You can still use its instance methods to fetch related data. To load the object's own attributes, use the update() or aupdate() (async) method.

from restreamsolutions import Pad

pad = Pad(id=668)
print(f"The API call to get the pad has not been made yet. Pad name: {getattr(pad, 'name', None)}")

# But we can still call any instance method
sites = pad.get_sites()
print(f'Successfully fetched sites: {sites}')

# Now, load the pad's own attributes
pad.update()
print(f"The update() method made an API call and loaded all attributes. Pad name: {pad.name}")

Getting parent objects (Pad from a Site)

To get the parent Pad for a Site, use the get_pad() instance method. The as_dict parameter is also available.

from restreamsolutions import Site

pad = Site(id=981).get_pad()
pad_as_dict = Site(id=981).get_pad(as_dict=True)

Getting and Monitoring Site State

Getting the current state

To get the current configuration of a specific site or all sites on a pad, use the get_state() and get_states() methods.

from restreamsolutions import Site, Pad

pad = Pad(id=681)
site = Site(id=981)

# Returns a single State object or None if the site is not configured
site_state = site.get_state()

# Returns a list of State objects for each configured site on the pad
pad_states = pad.get_states()

# You can also use the as_dict parameter
site_state_json = site.get_state(as_dict=True)
pad_states_json = pad.get_states(as_dict=True)

Filtering states

You can filter the State objects by stage name using StageNameFilters.

from restreamsolutions import Pad, StageNameFilters

pad = Pad(id=668)
frac_states = pad.get_states(stage_name_filter=StageNameFilters.FRAC)

Monitoring state for real-time updates

Since the current State of a site changes, you can monitor it by calling the update() method on the state object. However, there's a better approach — see the “Real-time Sites and Pads updates” section below.

import time
from restreamsolutions import Site, StageNameFilters

state = Site(id=981).get_state()
if state:
    for _ in range(3):
        print(f'State: {state.current_state}, '
              f'Stage number: {state.calculated_stage_number}, '
              f'Last update: {state.last_state_update}')
        time.sleep(60)
        state.update()  # Refresh the state object with the latest data

Getting Historical Stages Data

The get_stages_metadata() and aget_stages_metadata() (async) methods allow you to retrieve information about previous stages with optional filters.

from datetime import datetime, timezone
from restreamsolutions import Site, StageNameFilters

site = Site(id=1113)
start_date = datetime(2025, 10, 1, 0, 0, 0, tzinfo=timezone.utc)
end_date = datetime(2025, 10, 18, 0, 0, 0, tzinfo=timezone.utc)

# Get stages within a date range
stages_from_range = site.get_stages_metadata(start=start_date, end=end_date)
for stage in stages_from_range[:3]:
    print(f"ID: {stage['id']}, State: {stage['state']}, Start: {stage['start']}")

# Get stages with a specific stage number and stage name
stages_by_number = site.get_stages_metadata(stage_number=1, stage_name_filter=StageNameFilters.WIRELINE)

You can also include aggregated metrics for each stage by setting add_aggregations=True.

from datetime import datetime, timezone
from restreamsolutions import Site, StageNameFilters

site = Site(id=1113)
start_date = datetime(2025, 9, 17, 0, 0, 0, tzinfo=timezone.utc)
end_date = datetime(2025, 9, 18, 0, 0, 0, tzinfo=timezone.utc)
stages_with_aggregations = site.get_stages_metadata(
    start_date=start_date,
    end_date=end_date,
    stage_name_filter=StageNameFilters.FRAC,
    add_aggregations=True
)
for stage in stages_with_aggregations[:3]:
    print(f"ID: {stage['id']}, Aggregations: {stage['aggregations']}")

Getting Metadata

Measurement Sources Metadata

Retrieve metadata about measurement sources for an entire pad or a specific site.

from restreamsolutions import Pad, Site

# For an entire pad
measurement_sources = Pad(id=681).get_measurement_sources_metadata()
print(f'Pad measurement sources: {measurement_sources}')

# For a specific site
site = Site(id=1111)
measurement_sources = site.get_measurement_sources_metadata()
print(f'Site measurement sources: {measurement_sources}')

Fields Metadata

Get a list of available data field names for a pad or site. These names can be used to filter data retrieval.

from restreamsolutions import Pad, Site

pad = Pad(id=681)
pad_fields = pad.get_fields_metadata()
print(f'Pad fields: {pad_fields}')

site = Site(id=981)
site_fields = site.get_fields_metadata()
print(f'Site fields: {site_fields}')

Fetching Time-Series Data

To get data for a pad or site, use the get_data() or aget_data() (async) method. These methods return lazy Data or DataAsync objects, which can be used for streaming or saving to a file.

For pads, the is_routed parameter (default False) controls whether the data is distributed by their specific sites (True) or returned for the entire pad (False). See the get_data() documentation for more details.

from datetime import datetime, timezone
from restreamsolutions import Pad, StageNameFilters

pad = Pad(id=681)

data_obj = pad.get_data(
    start_datetime=datetime(2025, 9, 9, tzinfo=timezone.utc),
    end_datetime=datetime(2025, 9, 9, minute=1, tzinfo=timezone.utc),
    is_routed=True,
    stage_name_filter=StageNameFilters.FRAC
)

Streaming data

Data begins to download the first time you access the data_fetcher generator, allowing you to process it immediately without waiting for the full download.

from datetime import datetime, timezone
from restreamsolutions import Pad, StageNameFilters

pad = Pad(id=681)

data_obj = pad.get_data(
    start_datetime=datetime(2025, 9, 9, tzinfo=timezone.utc),
    end_datetime=datetime(2025, 9, 9, minute=1, tzinfo=timezone.utc),
    is_routed=True,
    stage_name_filter=StageNameFilters.FRAC
)

for one_second_item in data_obj.data_fetcher:
    print(one_second_item)

Saving data to a file

You can save the data to a JSON or CSV file. The format is chosen by the file extension (.json or .csv). If overwrite=False (the default) and the file already exists, a FileExistsError will be raised.

from datetime import datetime, timezone
from restreamsolutions import Pad, StageNameFilters

pad = Pad(id=681)

data_obj = pad.get_data(
    stage_number=1,
    is_routed=True,
    stage_name_filter=StageNameFilters.FRAC
)

# Save as JSON
data_obj.save('./data/data.json', overwrite=True)

# Or save as CSV
data_obj.save('./data/data.csv', overwrite=True)

Handling Data Changes

Occasionally, historical data may be corrected. You can get information about which sites and time periods were affected and download only the updated data.

The get_data_changes() method (and its async version aget_data_changes()) is available for Site and Pad classes. It returns a tuple containing a list of DataChange objects and a single Data or DataAsync object for fetching the corresponding data.

After receiving and processing the changes, you need to confirm their retrieval. This ensures that the next time you check for data changes, the ones already processed will be excluded. To do this, call confirm_data_received() (or aconfirm_data_received() for the async version) on the DataChange objects you have handled.

from restreamsolutions import Pad

pad = Pad(id=668)
change_events, changed_data = pad.get_data_changes()

# Inspect the change events
for event in change_events:
    print(f"ID: {event.id}, "
          f"Type: {event.modification_type}, "
          f"Start: {event.start_date}, "
          f"End: {event.end_date}")

# Fetch and process the data for all changes combined
for one_second_item in changed_data.data_fetcher:
    print(one_second_item)

# Save the changed data to a JSON file
changed_data.save('./data/data_changes.json', overwrite=True)
# Save the changed data to a CSV file
changed_data.save('./data/data_changes.csv', overwrite=True)

# Confirm that all data change events have been received and processed
for event in change_events:
    event.confirm_data_received()

You can also fetch the data for a single, specific change event.

from restreamsolutions import Pad

pad = Pad(id=668)
change_events, _ = pad.get_data_changes()

if change_events:
    first_change_event = change_events[0]
    first_event_data = first_change_event.get_data()

    for one_second_item in first_event_data.data_fetcher:
        print(one_second_item)

    # Confirm that you have received and processed the change event
    first_change_event.confirm_data_received()

Real-time data via WebSockets

The SDK supports receiving data in real time over WebSocket. The following stream types and methods are available:

  • Data change events (metadata only): get_realtime_data_changes_updates() / aget_realtime_data_changes_updates()
  • Site/Pad instance updates (includes site states updates): get_realtime_instance_updates() / aget_realtime_instance_updates()
  • Measurement streams: get_realtime_measurements_data() / aget_realtime_measurements_data()

Common behavior and tips:

  • All methods return lazy Data/DataAsync objects; read incoming messages by iterating over data_fetcher.
  • Use get_* for synchronous code, and aget_* for asynchronous code.
  • Streams can be saved to files using the save(...)/asave(...) method on the returned Data/DataAsync.
  • For measurement streams, a session_key is also returned so that you can resume reading the queue after a process restart — details are provided in the dedicated section below.

Detailed subsections for each real-time option are provided below.

Real-time data change events

In addition to periodically checking for changes via get_data_changes()/aget_data_changes(), you can subscribe to a live stream of data-change events over WebSocket using get_realtime_data_changes_updates() and aget_realtime_data_changes_updates() on Site and Pad objects.

Important: these functions return data-change events as Python dict objects (metadata only) and do not include the changed data itself. To load the actual records, use the Data/DataAsync objects returned by get_data_changes() and aget_data_changes() on Site/Pad. Those methods return a list of change events as a convenient Data/DataAsync from which you can iterate over the affected records or save them to a file.

from restreamsolutions import Pad

pad = Pad(id=681)
updates = pad.get_realtime_data_changes_updates()

for event in updates.data_fetcher:  # event is a dict with change metadata (id, modification_type, etc.)
    print(event)

Real-time Sites and Pads updates

You can subscribe to a continuous stream of real-time updates for a Pad (or Site) via WebSocket. The method returns a lazy Data/DataAsync object whose data_fetcher yields updates one by one. Use get_realtime_instance_updates() and aget_realtime_instance_updates() methods of the Pad and Site classes.

from restreamsolutions import Pad

pad = Pad(id=681)
updates = pad.get_realtime_instance_updates()

# Iterate over incoming messages (blocking loop)
for message in updates.data_fetcher:
    print(message)
    # Add your own break condition if needed
    # if should_stop():
    #     break

# You can also persist streamed updates as JSON
# updates.save('./data/pad_realtime_updates.json', overwrite=True)

Real-time Measurements Data

Use the following methods to open a WebSocket stream with measurements for a Site or Pad:

  • Pad: get_realtime_measurements_data() and aget_realtime_measurements_data()
  • Site: get_realtime_measurements_data() and aget_realtime_measurements_data()

These methods accept the same filter parameters as get_data()/aget_data().

Important behavior of filters:

  • If you DO pass any filters (for example start_datetime/end_datetime, fields, stage filters, etc.), the stream will first replay the historical data that matches those filters and then continue with real-time updates.
  • If you DO NOT pass any filters, only fresh real-time updates will be delivered. No historical backlog will be sent.
from datetime import datetime, timezone
from restreamsolutions import Pad, StageNameFilters

pad = Pad(id=681)

# With filters: first get historical, then live
stream, session_key = pad.get_realtime_measurements_data(
    start_datetime=datetime(2025, 9, 9, tzinfo=timezone.utc),
    end_datetime=datetime(2025, 9, 9, minute=1, tzinfo=timezone.utc),
    is_routed=True,
    stage_name_filter=StageNameFilters.FRAC,
    fields=["down_hole_pressure", "slurry_rate"]
)
for item in stream.data_fetcher:
    print(item)

# Without filters: live only (no historical replay)
live_only_stream, live_session_key = pad.get_realtime_measurements_data()
for item in live_only_stream.data_fetcher:
    print(item)

VERY IMPORTANT: session_key usage

  • The SDK maintains resilient WebSocket connections and will automatically reuse the same session_key to continue reading from the same message queue after transient network errors or normal closes (when restart flags are enabled).
  • If your whole Python process crashes or is restarted, you may want to resume from where you left off to avoid missing updates. To do so, persist the session_key returned as the second value from the method call (e.g., data, session_key = pad.get_realtime_measurements_data(...)) in a durable store (database, etc.), and supply it on the next start.
  • Never create multiple concurrent connections that use the same session_key. Doing so can lead to incorrect results or duplicated messages for each connected client.

Running tests

Run from the repository root folder

pytest

License

This project is distributed under the MIT License. See the LICENSE file.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

restreamsolutions-0.1.0-py3-none-any.whl (42.5 kB view details)

Uploaded Python 3

File details

Details for the file restreamsolutions-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: restreamsolutions-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 42.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.2.1 CPython/3.10.11 Windows/10

File hashes

Hashes for restreamsolutions-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 b841317add8c4412c676899e0957c315947cc8045c65fb64be318b5b0123b0e3
MD5 a1176fbae8a8c59b49a3ed2679678ebf
BLAKE2b-256 656a068870f7dd9f25b061ce9d345165b9abe07abda8dab800ff8544dba9d5bc

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page