Skip to main content

Python client for Hugr Arrow IPC over multipart/mixed

Project description

This is a python client for the Hugr IPC protocol

The hugr is a Data Mesh platform that allows you to query and explore data from various sources in a unified way. It provides a GraphQL interface to access data from different data sources, such as databases, APIs, and files. The hugr-client is a Python client for the Hugr platform that allows you to query data from the Hugr server and process it in a Pythonic way.

For more information about the Hugr platform, please visit the Hugr website or the Hugr GitHub repository.

The client can request from the hugr and process them in a pythonic way. For the effective data transmission, the client uses the hugr ipc protocol to communicate with the server.

Installation

pip install hugr-client

or

uv pip install hugr-client

Usage

import hugr

# connect to the server
client = hugr.Client("http://localhost:15001/ipc")

# query data
data = client.query("""
    {
        devices {
            id
            name
            geom
            last_seen{
                time
                value
            }
        }
        drivers {
            id
            name
            devices {
                id
                name
                geom
                last_seen{
                    time
                    value
                }
            }
        }
        drivers_by_pk(id: "driver_id") {
            id
            name
            devices {
                id
                name
                geom
                last_seen{
                    time
                    value
                }
            }
        }
    }
""")

# get results as a pandas dataframe
df = data.df('data.devices') # or df = data["data.devices"].df()

# get results as a geopandas dataframe
gdf = data.gdf('data.devices', 'geom') # or gdf = data["data.devices"].gdf("geom")

# if the geometry field is placed in the nested object or arrays `gdf` will flatten the data until the geometry field is found
# field name is optional if data has only one geometry field
gdf = data.gdf('data.drivers', 'devices.geom') # or gdf = data["data.drivers"].gdf("devices.geom")

# get record as a dictionary
d = data.record('data.iot.drivers_by_pk')

# operate parts of results
part = data["data.devices"] 

# get pandas dataframe from the record
df = data.df('data.iot.drivers_by_pk') # or df = part.df()

# get geopandas dataframe from the record, dataframe will be flattened until the geometry field is found
gdf = data.gdf('data.iot.drivers_by_pk', 'devices.geom') # or gdf = part.gdf("devices.geom") or gdf = part.gdf() if only one geometry field is present

# explore geography data in the Jupyter Notebooks (labs or notebooks)

data.explore_map() # or part.explore_map() or hugr.explore_map(data) or hugr.explore_map(part)

Connection parameters

  • url - the url of the hugr server
  • api_key - the api key for the hugr server (if using api key authentication)
  • token - the token for the hugr server (if using token authentication)
  • role - the role for the hugr server (if user has a few roles in the token)

It also support querying by set up connection parameters.

Parameters will be passed from the environment variables:

  • HUGR_URL - the url of the hugr server
  • HUGR_API_KEY - the api key for the hugr server (if using api key authentication)
  • HUGR_TOKEN - the token for the hugr server (if using token authentication)
  • HUGR_API_KEY_HEADER - the header name for the api key (if using api key authentication)
  • HUGR_ROLE_HEADER - the header name for the role (if user has a few roles in the token).
import hugr

hugr.query(
    query="""
        {
            devices {
                id
                name
                geom
                last_seen{
                    time
                    value
                }
            }
        }
    """
)

Streaming API

In addition to standard HTTP queries, hugr-client supports asynchronous streaming of data via WebSocket. This allows you to receive large datasets in batches or row-by-row, without waiting for the entire result to be loaded into memory.

Quick Start

import asyncio
from hugr.stream import connect_stream

async def main():
    client = connect_stream("http://localhost:15001/ipc")

    # HTTP query for total count
    result = client.query("query { devices_aggregation { _rows_count } }")
    print("Total devices:", result.record()['_rows_count'])

    # Stream data in batches (Arrow RecordBatch)
    async with await client.stream(
        """
        query {
            devices {
                id
                name
                geom
            }
        }
        """
    ) as stream:
        async for batch in stream.chunks():
            df = batch.to_pandas()
            print("Batch:", len(df), "rows")

    # Stream data row by row
    async with await client.stream(
        "query { devices { id name status } }"
    ) as stream:
        async for row in stream.rows():
            print(row)

asyncio.run(main())

Main Features

  • connect_stream — create a streaming client (WebSocket).
  • client.stream(query, variables=None) — asynchronously get a stream of Arrow RecordBatch for a GraphQL query.
  • stream.chunks() — async generator for batches (RecordBatch).
  • stream.rows() — async generator for rows (dict).
  • stream.to_pandas() — collect all streamed data into a pandas.DataFrame.
  • stream.count() — count the number of rows in the stream.
  • stream_data_object(data_object, fields, variables=None) — stream a specific data object and fields.

Example: Collect DataFrame via Streaming

import asyncio
from hugr.stream import connect_stream

async def main():
    client = connect_stream("http://localhost:15001/ipc")
    async with await client.stream(
        "query { devices { id name geom } }"
    ) as stream:
        df = await stream.to_pandas()
        print(df.head())

asyncio.run(main())

Example: Row-by-row Processing

import asyncio
from hugr.stream import connect_stream

async def main():
    client = connect_stream()
    async with await client.stream(
        "query { devices { id name status } }"
    ) as stream:
        async for row in stream.rows():
            if row.get("status") == "active":
                print("Active device:", row["name"])

asyncio.run(main())

Example: Query Cancellation

import asyncio
from hugr.stream import connect_stream

async def main():
    client = connect_stream()
    async with await client.stream(
        "query { devices { id name } }"
    ) as stream:
        count = 0
        async for batch in stream.chunks():
            count += batch.num_rows
            if count > 1000:
                await client.cancel_current_query()
                break

asyncio.run(main())

Notes

  • All streaming functions are asynchronous and require async/await.
  • Dependencies: websockets, pyarrow, pandas.
  • You can use both a pure streaming client and an enhanced client with HTTP and WebSocket support.

See more in hugr/stream.py and the code examples in the source files.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Contributing

Contributions are welcome!

Dependencies

  • "requests",
  • "pyarrow",
  • "pandas",
  • "geopandas",
  • "shapely",
  • "requests_toolbelt",
  • "numpy",
  • "shapely",

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

hugr_client-0.1.1.tar.gz (14.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

hugr_client-0.1.1-py3-none-any.whl (12.8 kB view details)

Uploaded Python 3

File details

Details for the file hugr_client-0.1.1.tar.gz.

File metadata

  • Download URL: hugr_client-0.1.1.tar.gz
  • Upload date:
  • Size: 14.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for hugr_client-0.1.1.tar.gz
Algorithm Hash digest
SHA256 c7435ab9d553a6344ebaed125366cb52416dddd3a535db43ac45e31d2a8ce565
MD5 2b1069c35d42b91a8dea1fb13d00892c
BLAKE2b-256 f5d82889fa2cf8520ae579ef70653802c09573a04450d45da2928712117d5a51

See more details on using hashes here.

File details

Details for the file hugr_client-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: hugr_client-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 12.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for hugr_client-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 78b36421bb323b9feea5c103ad79ff921cfe7d950737c58d5012077e903593de
MD5 eb500334c82fdfde44f19b8074f7e613
BLAKE2b-256 39ad59a0ae35f07190ed968d2c31d5150a38d5e044ce7ca9697f5b1393c47bac

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page