Python SDK for Arkiv networks - Web3.py + Entities
Project description
Arkiv SDK
Arkiv is a permissioned storage system for decentralized apps, supporting flexible entities with binary data, attributes, and metadata.
The Arkiv SDK is the official Python library for interacting with Arkiv networks. It offers a type-safe, developer-friendly API for managing entities, querying data, subscribing to events, and offchain verification—ideal for both rapid prototyping and production use.
Architecture
Principles:
- The SDK is based on a modern and stable client library.
- The SDK should feel like "Library + Entities"
As underlying library we use Web3.py (no good alternatives for Python).
Arkiv Client
The Arkiv SDK should feel like "web3.py + entities", maintaining the familiar developer experience that Python web3 developers expect.
A client.arkiv.* approach is in line with web3.py's module pattern.
It clearly communicates that arkiv is a module extension just like eth, net, etc.
Hello World
Synchronous API
Here's a "Hello World!" example showing how to use the Python Arkiv SDK:
from arkiv import Arkiv
# Create Arkiv client with default settings:
# - starting and connecting to a containerized Arkiv node
# - creating a funded default account
client = Arkiv()
print(f"Client: {client}, connected: {client.is_connected()}")
print(f"Account: {client.eth.default_account}")
print(f"Balance: {client.from_wei(client.eth.get_balance(client.eth.default_account), 'ether')} ETH")
# Create entity with data and attributes
entity_key, receipt = client.arkiv.create_entity(
payload = b"Hello World!",
content_type = "text/plain",
attributes = {"type": "greeting", "version": 1},
expires_in = client.arkiv.to_seconds(days=1)
)
# Get individual entity and print its details
entity = client.arkiv.get_entity(entity_key)
print(f"Creation TX: {receipt.tx_hash}")
print(f"Entity: {entity}")
Asynchronous API
For async/await support, use AsyncArkiv:
import asyncio
from arkiv import AsyncArkiv
async def main():
# Create async client with default settings
async with AsyncArkiv() as client:
# Create entity with data and attributes
entity_key, tx_hash = await client.arkiv.create_entity(
payload = b"Hello Async World!",
content_type = "text/plain",
attributes = {"type": "greeting", "version": 1},
expires_in = client.arkiv.to_seconds(days=1)
)
# Get entity and check existence
entity = await client.arkiv.get_entity(entity_key)
exists = await client.arkiv.entity_exists(entity_key)
asyncio.run(main())
Web3 Standard Support
from web3 import HTTPProvider
provider = HTTPProvider('https://mendoza.hoodi.arkiv.network/rpc')
# Arkiv 'is a' Web3 client
client = Arkiv(provider)
balance = client.eth.get_balance(client.eth.default_account)
tx = client.eth.get_transaction(tx_hash)
Entity Operations
Beyond creating and reading entities, Arkiv supports updating, extending, transferring ownership, and deleting entities.
Update Entity
Modify an entity's payload, attributes, or expiration:
# Update entity with new payload and attributes
entity_key, receipt = client.arkiv.update_entity(
entity_key,
payload=b"Updated content",
attributes={"type": "greeting", "version": 2},
expires_in=client.arkiv.to_seconds(days=7)
)
Extend Entity Lifetime
Extend an entity's expiration without modifying its content:
# Extend entity lifetime by 30 days
entity_key, receipt = client.arkiv.extend_entity(
entity_key,
extend_by=client.arkiv.to_seconds(days=30)
)
Change Entity Owner
Transfer ownership of an entity to another address:
# Transfer entity to a new owner
new_owner = "0x1234567890abcdef1234567890abcdef12345678"
entity_key, receipt = client.arkiv.change_owner(entity_key, new_owner)
Delete Entity
Permanently remove an entity (only the owner can delete):
# Delete entity
receipt = client.arkiv.delete_entity(entity_key)
Advanced Features
Query Builder
The query builder provides a clean, chainable API for querying entities. It wraps the lower-level query_entities method with a SQL-like interface.
Basic Usage
from arkiv import Arkiv, StrAttr
from arkiv.types import KEY, ATTRIBUTES
client = Arkiv()
# Define typed attributes
entity_type = StrAttr("type")
status = StrAttr("status")
# Simple query - select all fields
results = client.arkiv.select().where(entity_type == "user").fetch()
for entity in results:
print(f"Entity: {entity.key}")
# Select specific fields
results = client.arkiv.select(KEY, ATTRIBUTES).where(status == "active").fetch()
# Count matching entities
count = client.arkiv.select().where(entity_type == "user").count()
print(f"Found {count} users")
Expressions with IntAttr/StrAttr
The expression builder generates SQL-like query strings under the hood, providing a type-safe Python API for constructing filter conditions. The .where() method accepts either an Expr object from the expression builder or a raw SQL-like query string (see Query Language below).
For dynamic query building with runtime type checking, use the expression builder:
from arkiv import Arkiv, IntAttr, StrAttr, IntSort, DESC
client = Arkiv()
# Define typed attributes
age = IntAttr("age")
status = StrAttr("status")
role = StrAttr("role")
# Build expressions with operators
results = client.arkiv.select() \
.where((age >= 18) & (status == "active")) \
.order_by(IntSort("age", DESC)) \
.fetch()
# Complex expressions with OR and AND
results = client.arkiv.select() \
.where((role == "admin") | (role == "moderator") & (status == "active")) \
.fetch()
# NOT operator
results = client.arkiv.select() \
.where((age >= 18) & ~(status == "banned")) \
.fetch()
# Type checking catches errors early
age == "18" # TypeError: IntAttr 'age' requires int, got str
status == 1 # TypeError: StrAttr 'status' requires str, got int
Expression Operators:
&- AND|- OR~- NOT
Note: Always use parentheses around comparisons when combining with &, |, or ~ due to Python operator precedence.
Sorting with IntSort/StrSort
Use type-specific sort classes for ORDER BY clauses:
from arkiv import Arkiv, IntSort, StrSort, StrAttr, DESC
client = Arkiv()
# Define typed attributes
entity_type = StrAttr("type")
status = StrAttr("status")
# Define sorting_criteria
status_asc = StrSort("status")
age_desc = IntSort("age", DESC)
# Sort by age descending
results = client.arkiv.select() \
.where(entity_type == "user") \
.order_by(age_desc) \
.fetch()
# Multi-field sorting: status ascending, then age descending
results = client.arkiv.select() \
.where(status == "active") \
.order_by(status_asc, age_desc) \
.fetch()
Limiting Results
Use .limit() to restrict the total number of results and .max_page_size() to control pagination:
from arkiv import Arkiv, IntSort, StrAttr, DESC
client = Arkiv()
# Define typed attribute
entity_type = StrAttr("type")
# Get first 10 matching entities
results = client.arkiv.select() \
.where(entity_type == "user") \
.limit(10) \
.fetch()
# Top 5 users by age
results = client.arkiv.select() \
.where(entity_type == "user") \
.order_by(IntSort("age", DESC)) \
.limit(5) \
.fetch()
# Control page size for large entities (smaller pages = less memory per request)
results = client.arkiv.select() \
.where(entity_type == "document") \
.max_page_size(10) \
.fetch()
# Combine limit and page size
results = client.arkiv.select() \
.where(entity_type == "user") \
.limit(100) \
.max_page_size(25) \
.fetch()
Batch Operations
Batch operations allow you to group multiple entity operations (create, update, extend, delete, change_owner) into a single atomic transaction. This is more efficient and ensures all operations either succeed or fail together.
Basic Usage
from arkiv import Arkiv
client = Arkiv()
# Using context manager (recommended)
with client.arkiv.batch() as batch:
batch.create_entity(payload=b"item 1", expires_in=3600)
batch.create_entity(payload=b"item 2", expires_in=3600)
batch.create_entity(payload=b"item 3", expires_in=3600)
# Batch is automatically executed on exit
print(f"Created {len(batch.receipt.creates)} entities")
# Access created entity keys
for create_event in batch.receipt.creates:
print(f"Created: {create_event.key}")
Loop-Based Creation
Batch operations work naturally with loops:
items = [
{"name": "alice", "role": "admin"},
{"name": "bob", "role": "user"},
{"name": "charlie", "role": "user"},
]
with client.arkiv.batch() as batch:
for item in items:
batch.create_entity(
payload=item["name"].encode(),
attributes={"role": item["role"]},
expires_in=3600,
)
print(f"Created {len(batch.receipt.creates)} users")
Mixed Operations
A single batch can contain different operation types:
with client.arkiv.batch() as batch:
# Create new entities
batch.create_entity(payload=b"new item", expires_in=3600)
# Update existing entities
batch.update_entity(existing_key, payload=b"updated", expires_in=3600)
# Extend entity lifetime
batch.extend_entity(another_key, extend_by=7200)
# Change ownership
batch.change_owner(some_key, new_owner_address)
# Delete entities
batch.delete_entity(old_key)
# Check results
print(f"Creates: {len(batch.receipt.creates)}")
print(f"Updates: {len(batch.receipt.updates)}")
print(f"Extensions: {len(batch.receipt.extensions)}")
print(f"Deletes: {len(batch.receipt.deletes)}")
Manual Execution
For more control, you can execute batches manually:
batch = client.arkiv.batch()
batch.create_entity(payload=b"data", expires_in=3600)
batch.create_entity(payload=b"more data", expires_in=3600)
# Execute explicitly
receipt = batch.execute()
print(f"Transaction: {receipt.tx_hash}")
Async Support
Batch operations work with AsyncArkiv:
async with AsyncArkiv() as client:
async with client.arkiv.batch() as batch:
batch.create_entity(payload=b"async item 1", expires_in=3600)
batch.create_entity(payload=b"async item 2", expires_in=3600)
print(f"Created {len(batch.receipt.creates)} entities")
Error Handling
- If an exception occurs inside the context manager, the batch is not executed
- Empty batches are silently skipped (no-op)
- All operations in a batch are atomic: if any operation fails, the entire batch is rolled back
try:
with client.arkiv.batch() as batch:
batch.create_entity(payload=b"item 1", expires_in=3600)
raise ValueError("Something went wrong")
batch.create_entity(payload=b"item 2", expires_in=3600)
except ValueError:
pass
# Batch was not executed - no entities created
assert batch.receipt is None
Provider Builder
The ProviderBuilder provides a fluent API for creating providers to connect to various Arkiv networks:
from arkiv import Arkiv
from arkiv.account import NamedAccount
from arkiv.provider import ProviderBuilder
# Create account from wallet json
with open('wallet_bob.json', 'r') as f:
wallet = f.read()
bob = NamedAccount.from_wallet('Bob', wallet, 's3cret')
# Initialize Arkiv client connected to Kaolin (Arkiv testnet)
provider = ProviderBuilder().kaolin().build()
client = Arkiv(provider, account=bob)
# Additional builder examples
provider_custom = ProviderBuilder().custom("https://mendoza.hoodi.arkiv.network/rpc").build()
provider_container = ProviderBuilder().node().build()
provider_kaolin_ws = ProviderBuilder().kaolin().ws().build()
Query Language
Arkiv uses a SQL-like query language to filter and retrieve entities based on their attributes. The query language supports standard comparison operators, logical operators, and parentheses for complex conditions.
Supported Operators
Comparison Operators:
=- Equal to!=- Not equal to>- Greater than>=- Greater than or equal to<- Less than<=- Less than or equal to
Logical Operators:
AND- Logical ANDOR- Logical ORNOT- Logical NOT (can also use!=)
Parentheses can be used to group conditions and control evaluation order.
Query Examples
from arkiv import Arkiv
client = Arkiv()
# Simple equality
query = 'type = "user"'
entities = list(client.arkiv.query_entities(query))
# Note that inn the examples below the call to query_entities is omitted
# Multiple conditions with AND
query = 'type = "user" AND status = "active"'
# OR conditions with parentheses
query = 'type = "user" AND (status = "active" OR status = "pending")'
# Comparison operators
query = 'type = "user" AND age >= 18 AND age < 65'
# NOT conditions
query = 'type = "user" AND status != "deleted"'
# Alternative NOT syntax
query = 'type = "user" AND NOT (status = "deleted")'
# Complex nested conditions
query = '(type = "user" OR type = "admin") AND (age >= 18 AND age <= 65)'
# Multiple NOT conditions
query = 'type = "user" AND status != "deleted" AND status != "banned"'
# Pattern matching with GLOB (using * as wildcard)
query = 'name GLOB "John*"' # Names starting with "John"
# Pattern matching with suffix
query = 'email GLOB "*@example.com"' # Emails ending with @example.com
Note: String values in queries must be enclosed in double quotes ("). Numeric values do not require quotes. The GLOB operator supports pattern matching using * as a wildcard character.
Note that the GLOB operator might be replace by a SQL standard LIKE operator in the future.
Watch Entity Events
Arkiv provides near real-time event monitoring for entity lifecycle changes. You can watch for entity creation, updates, extensions, deletions, and ownership changes using callback-based event filters.
Available Event Types
watch_entity_created- Monitor when new entities are createdwatch_entity_updated- Monitor when entities are updatedwatch_entity_extended- Monitor when entity lifetimes are extendedwatch_entity_deleted- Monitor when entities are deletedwatch_owner_changed- Monitor when entity ownership changes
Basic Usage
from arkiv import Arkiv
client = Arkiv()
# Define callback function to handle events
def on_entity_created(event, tx_hash):
print(f"New entity created: {event.key}")
print(f"Owner: {event.owner}")
print(f"Transaction: {tx_hash}")
# Start watching for entity creation events
event_filter = client.arkiv.watch_entity_created(on_entity_created)
# Create an entity - callback will be triggered
entity_key, _ = client.arkiv.create_entity(
payload=b"Hello World",
attributes={"type": "greeting"}
)
# Stop watching when done
event_filter.stop()
event_filter.uninstall()
Watching Multiple Event Types
created_events = []
updated_events = []
deleted_events = []
def on_created(event, tx_hash):
created_events.append((event, tx_hash))
def on_updated(event, tx_hash):
updated_events.append((event, tx_hash))
def on_deleted(event, tx_hash):
deleted_events.append((event, tx_hash))
# Watch multiple event types simultaneously
filter_created = client.arkiv.watch_entity_created(on_created)
filter_updated = client.arkiv.watch_entity_updated(on_updated)
filter_deleted = client.arkiv.watch_entity_deleted(on_deleted)
# Perform operations...
# Events are captured in real-time
# Cleanup all filters
filter_created.uninstall()
filter_updated.uninstall()
filter_deleted.uninstall()
Historical Events
By default, watchers only capture new events from the current block forward. You can also watch from a specific historical block:
# Watch from a specific block number
event_filter = client.arkiv.watch_entity_created(
on_entity_created,
from_block=1000
)
# Watch from the beginning of the chain
event_filter = client.arkiv.watch_entity_created(
on_entity_created,
from_block=0
)
Automatic Cleanup
When using Arkiv as a context manager, all event filters are automatically cleaned up on exit:
with Arkiv() as client:
# Create event filters
filter1 = client.arkiv.watch_entity_created(callback1)
filter2 = client.arkiv.watch_entity_updated(callback2)
# Perform operations...
# Filters are automatically stopped and uninstalled when exiting context
You can also manually clean up all active filters:
client.arkiv.cleanup_filters()
Note: Event watching requires polling the node for new events. The SDK handles this automatically in the background.
Development Guide
Setup
Requirements
- Python: Version 3.10 or higher
- Install:
pip install --pre arkiv-sdkpip install testcontainers websockets
- RPC:
https://mendoza.hoodi.arkiv.network/rpc
Branches, Versions, Changes
Branches
The current stable branch on Git is main.
Currently main hosts the initial SDK implementation.
The branch v1-dev hosts the future V1.0 SDK release.
Versions
For version management the uv package and project manger is used. Use the command below to display the current version
uv version
SDK versions are tracked in the following files:
pyproject.tomluv.lock
Testing
Pytest is used for unit and integration testing.
uv run pytest # Run all tests
uv run pytest -k test_create_entity_simple --log-cli-level=info # Specific tests via keyword, print at info log level
If an .env file is present the unit tests are run against the specifice RPC coordinates and test accounts.
An example wallet file is provided in .env.testing
Make sure that the specified test accounts are properly funded before running the tests.
Otherwise, the tests are run against a testcontainer containing an Arkiv RPC Node. Test accounts are created on the fly and using the CLI inside the local RPC Nonde.
Account wallets for such tests can be created via the command shown below.
The provided example creates the wallet file wallet_alice.json using the password provided during the execution of the command.
uv run python -m arkiv.account alice
Code Quality
This project uses comprehensive unit testing, linting and type checking to maintain high code quality:
Quick Commands
Before any commit run quality checks:
./scripts/check-all.sh
Tools Used
- MyPy: Static type checker with strict configuration
- Ruff: Fast linter and formatter (replaces black, isort, flake8, etc.)
- Pre-commit: Automated quality checks on git commits
Individual commands
uv run ruff check . --fix # Lint and auto-fix
uv run ruff format . # Format code
uv run mypy src/ tests/ # Type check
uv run pytest tests/ -v # Run tests
uv run pytest --cov=src # Run code coverage
uv run pre-commit run --all-files # Manual pre commit checks
Pre-commit Hooks
Pre-commit hooks run automatically on git commit and will:
- Fix linting issues with ruff
- Format code consistently
- Run type checking with mypy
- Check file formatting (trailing whitespace, etc.)
MyPy Settings
strict = true- Enable all strict checksno_implicit_reexport = true- Require explicit re-exportswarn_return_any = true- Warn about returning Any values- Missing imports are ignored for third-party libraries without type stubs
Ruff Configuration
- Use 88 character line length (Black-compatible)
- Target Python 3.10+ features
- Enable comprehensive rule sets (pycodestyle, pyflakes, isort, etc.)
- Auto-fix issues where possible
- Format with double quotes and trailing commas
Alias
function gl { git log --format="%C(green)%ad%C(reset) %C(yellow)%h%C(reset)%C(auto)%d%C(reset) %s" --date=format:"%Y-%m-%d_%H:%M:%S" -n ${1:-10}; }
alias gs='git status'
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file arkiv_sdk-1.0.0b3.dev0.tar.gz.
File metadata
- Download URL: arkiv_sdk-1.0.0b3.dev0.tar.gz
- Upload date:
- Size: 126.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5d8c8ddb0752d6699e4aedad5f5fc0bd71fb580ee252cc163e9caec5a144ae8c
|
|
| MD5 |
f142c5b45c211fa9413ce20b266f6258
|
|
| BLAKE2b-256 |
b126f21a46683e0cdad22ffa8f757e04d5421d57def7f59efbe160f89a4d579f
|
File details
Details for the file arkiv_sdk-1.0.0b3.dev0-py3-none-any.whl.
File metadata
- Download URL: arkiv_sdk-1.0.0b3.dev0-py3-none-any.whl
- Upload date:
- Size: 69.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
297ac6fcca1f77dcb621bf7ce9e718c52c4f049910b76ec440b546303891940d
|
|
| MD5 |
2c8b66fc7d16d13c1cab0b2bce187eaa
|
|
| BLAKE2b-256 |
dafe2b1d0389e75f1e07cccb7d5b472459f90256152fef38ad55e9b7727489bf
|