Python client for Hugr Arrow IPC over multipart/mixed
Project description
This is a python client for the Hugr IPC protocol
The hugr is a Data Mesh platform that allows you to query and explore data from various sources in a unified way. It provides a GraphQL interface to access data from different data sources, such as databases, APIs, and files. The hugr-client is a Python client for the Hugr platform that allows you to query data from the Hugr server and process it in a Pythonic way.
For more information about the Hugr platform, please visit the Hugr website or the Hugr GitHub repository.
The client can request from the hugr and process them in a pythonic way. For the effective data transmission, the client uses the hugr ipc protocol to communicate with the server.
Installation
pip install hugr-client
or
uv pip install hugr-client
Usage
import hugr
# connect to the server
client = hugr.Client("http://localhost:15001/ipc")
# query data
data = client.query("""
{
devices {
id
name
geom
last_seen{
time
value
}
}
drivers {
id
name
devices {
id
name
geom
last_seen{
time
value
}
}
}
drivers_by_pk(id: "driver_id") {
id
name
devices {
id
name
geom
last_seen{
time
value
}
}
}
}
""")
# get results as a pandas dataframe
df = data.df('data.devices') # or df = data["data.devices"].df()
# get results as a geopandas dataframe
gdf = data.gdf('data.devices', 'geom') # or gdf = data["data.devices"].gdf("geom")
# if the geometry field is placed in the nested object or arrays `gdf` will flatten the data until the geometry field is found
# field name is optional if data has only one geometry field
gdf = data.gdf('data.drivers', 'devices.geom') # or gdf = data["data.drivers"].gdf("devices.geom")
# get record as a dictionary
d = data.record('data.iot.drivers_by_pk')
# operate parts of results
part = data["data.devices"]
# get pandas dataframe from the record
df = data.df('data.iot.drivers_by_pk') # or df = part.df()
# get geopandas dataframe from the record, dataframe will be flattened until the geometry field is found
gdf = data.gdf('data.iot.drivers_by_pk', 'devices.geom') # or gdf = part.gdf("devices.geom") or gdf = part.gdf() if only one geometry field is present
# explore geography data in the Jupyter Notebooks (labs or notebooks)
data.explore_map() # or part.explore_map() or hugr.explore_map(data) or hugr.explore_map(part)
Connection parameters
url- the url of the hugr serverapi_key- the api key for the hugr server (if using api key authentication)token- the token for the hugr server (if using token authentication)role- the role for the hugr server (if user has a few roles in the token)
It also support querying by set up connection parameters.
Parameters will be passed from the environment variables:
- HUGR_URL - the url of the hugr server
- HUGR_API_KEY - the api key for the hugr server (if using api key authentication)
- HUGR_TOKEN - the token for the hugr server (if using token authentication)
- HUGR_API_KEY_HEADER - the header name for the api key (if using api key authentication)
- HUGR_ROLE_HEADER - the header name for the role (if user has a few roles in the token).
import hugr
hugr.query(
query="""
{
devices {
id
name
geom
last_seen{
time
value
}
}
}
"""
)
Streaming API
In addition to standard HTTP queries, hugr-client supports asynchronous streaming of data via WebSocket. This allows you to receive large datasets in batches or row-by-row, without waiting for the entire result to be loaded into memory.
Quick Start
import asyncio
from hugr.stream import connect_stream
async def main():
client = connect_stream("http://localhost:15001/ipc")
# HTTP query for total count
result = client.query("query { devices_aggregation { _rows_count } }")
print("Total devices:", result.record()['_rows_count'])
# Stream data in batches (Arrow RecordBatch)
async with await client.stream(
"""
query {
devices {
id
name
geom
}
}
"""
) as stream:
async for batch in stream.chunks():
df = batch.to_pandas()
print("Batch:", len(df), "rows")
# Stream data row by row
async with await client.stream(
"query { devices { id name status } }"
) as stream:
async for row in stream.rows():
print(row)
asyncio.run(main())
Main Features
- connect_stream — create a streaming client (WebSocket).
- client.stream(query, variables=None) — asynchronously get a stream of Arrow RecordBatch for a GraphQL query.
- stream.chunks() — async generator for batches (RecordBatch).
- stream.rows() — async generator for rows (dict).
- stream.to_pandas() — collect all streamed data into a pandas.DataFrame.
- stream.count() — count the number of rows in the stream.
- stream_data_object(data_object, fields, variables=None) — stream a specific data object and fields.
Example: Collect DataFrame via Streaming
import asyncio
from hugr.stream import connect_stream
async def main():
client = connect_stream("http://localhost:15001/ipc")
async with await client.stream(
"query { devices { id name geom } }"
) as stream:
df = await stream.to_pandas()
print(df.head())
asyncio.run(main())
Example: Row-by-row Processing
import asyncio
from hugr.stream import connect_stream
async def main():
client = connect_stream()
async with await client.stream(
"query { devices { id name status } }"
) as stream:
async for row in stream.rows():
if row.get("status") == "active":
print("Active device:", row["name"])
asyncio.run(main())
Example: Query Cancellation
import asyncio
from hugr.stream import connect_stream
async def main():
client = connect_stream()
async with await client.stream(
"query { devices { id name } }"
) as stream:
count = 0
async for batch in stream.chunks():
count += batch.num_rows
if count > 1000:
await client.cancel_current_query()
break
asyncio.run(main())
Notes
- All streaming functions are asynchronous and require
async/await. - Dependencies:
websockets,pyarrow,pandas. - You can use both a pure streaming client and an enhanced client with HTTP and WebSocket support.
See more in hugr/stream.py and the code examples in the source files.
License
This project is licensed under the MIT License - see the LICENSE file for details.
Contributing
Contributions are welcome!
Dependencies
- "requests",
- "pyarrow",
- "pandas",
- "geopandas",
- "shapely",
- "requests_toolbelt",
- "numpy",
- "shapely",
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file hugr_client-0.1.1.tar.gz.
File metadata
- Download URL: hugr_client-0.1.1.tar.gz
- Upload date:
- Size: 14.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c7435ab9d553a6344ebaed125366cb52416dddd3a535db43ac45e31d2a8ce565
|
|
| MD5 |
2b1069c35d42b91a8dea1fb13d00892c
|
|
| BLAKE2b-256 |
f5d82889fa2cf8520ae579ef70653802c09573a04450d45da2928712117d5a51
|
File details
Details for the file hugr_client-0.1.1-py3-none-any.whl.
File metadata
- Download URL: hugr_client-0.1.1-py3-none-any.whl
- Upload date:
- Size: 12.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
78b36421bb323b9feea5c103ad79ff921cfe7d950737c58d5012077e903593de
|
|
| MD5 |
eb500334c82fdfde44f19b8074f7e613
|
|
| BLAKE2b-256 |
39ad59a0ae35f07190ed968d2c31d5150a38d5e044ce7ca9697f5b1393c47bac
|