Skip to main content

Python client for the DeFiStream API

Project description

DeFiStream Python Client

Official Python client for the DeFiStream API.

Getting an API Key

To use the DeFiStream API, you need to sign up for an account at defistream.dev to obtain your API key.

Installation

pip install defistream

This includes pandas and pyarrow by default for DataFrame support.

With polars support (in addition to pandas):

pip install defistream[polars]

Quick Start

from defistream import DeFiStream

# Initialize client (reads DEFISTREAM_API_KEY from environment if not provided)
client = DeFiStream()

# Or with explicit API key
client = DeFiStream(api_key="dsk_your_api_key")

# Query ERC20 transfers using builder pattern
df = (
    client.erc20.transfers("USDT")
    .network("ETH")
    .block_range(21000000, 21010000)
    .as_df()
)

print(df.head())

Features

  • Builder pattern: Fluent query API with chainable methods
  • Aggregate queries: Bucket events into time or block intervals with summary statistics
  • Type-safe: Full type hints and Pydantic models
  • Multiple formats: DataFrame (pandas/polars), CSV, Parquet, JSON
  • Async support: Native async/await with AsyncDeFiStream
  • All protocols: ERC20, AAVE, Uniswap, Lido, Stader, Threshold, Native tokens

Supported Protocols

Protocol Events
ERC20 transfers
Native Token transfers
AAVE V3 deposits, withdrawals, borrows, repays, flashloans, liquidations
Uniswap V3 swaps, deposits, withdrawals, collects
Lido deposits, withdrawal_requests, withdrawals_claimed, l2_deposits, l2_withdrawal_requests
Stader deposits, withdrawal_requests, withdrawals
Threshold deposit_requests, deposits, withdrawal_requests, withdrawals

Usage Examples

Builder Pattern

The client uses a fluent builder pattern. The query is only executed when you call a terminal method like as_df(), as_file(), or as_dict().

from defistream import DeFiStream

client = DeFiStream()

# Build query step by step
query = client.erc20.transfers("USDT")
query = query.network("ETH")
query = query.block_range(21000000, 21010000)
query = query.min_amount(1000)

# Execute and get DataFrame
df = query.as_df()

# Or chain everything
df = (
    client.erc20.transfers("USDT")
    .network("ETH")
    .block_range(21000000, 21010000)
    .min_amount(1000)
    .as_df()
)

ERC20 Transfers

# Get USDT transfers over 10,000 USDT
df = (
    client.erc20.transfers("USDT")
    .network("ETH")
    .block_range(21000000, 21010000)
    .min_amount(10000)
    .as_df()
)

# Query multiple tokens at once (known symbols only, not contract addresses)
df = (
    client.erc20.transfers("USDT", "USDC", "DAI")
    .network("ETH")
    .block_range(21000000, 21010000)
    .as_df()
)

# Or set multiple tokens via chain method
df = (
    client.erc20.transfers()
    .token("USDT", "USDC")
    .network("ETH")
    .block_range(21000000, 21010000)
    .as_df()
)

# Filter by sender
df = (
    client.erc20.transfers("USDT")
    .network("ETH")
    .block_range(21000000, 21010000)
    .sender("0x28c6c06298d514db089934071355e5743bf21d60")
    .as_df()
)

AAVE Events

# Get deposits
df = (
    client.aave_v3.deposits()
    .network("ETH")
    .block_range(21000000, 21010000)
    .as_df()
)

# Use a specific market type on ETH (Core, Prime, or EtherFi)
df = (
    client.aave_v3.deposits()
    .network("ETH")
    .block_range(21000000, 21010000)
    .eth_market_type("Prime")
    .as_df()
)

Uniswap Swaps

# Get swaps for WETH/USDC pool with 0.05% fee tier
df = (
    client.uniswap_v3.swaps("WETH", "USDC", 500)
    .network("ETH")
    .block_range(21000000, 21010000)
    .as_df()
)

# Or build with chain methods
df = (
    client.uniswap_v3.swaps()
    .symbol0("WETH")
    .symbol1("USDC")
    .fee(500)
    .network("ETH")
    .block_range(21000000, 21010000)
    .as_df()
)

Native Token Transfers

# Get ETH transfers >= 1 ETH
df = (
    client.native_token.transfers()
    .network("ETH")
    .block_range(21000000, 21010000)
    .min_amount(1.0)
    .as_df()
)

Label & Category Filters

# Get USDT transfers involving Binance wallets
df = (
    client.erc20.transfers("USDT")
    .network("ETH")
    .block_range(21000000, 21010000)
    .involving_label("Binance")
    .as_df()
)

# Get USDT transfers FROM exchanges TO DeFi protocols
df = (
    client.erc20.transfers("USDT")
    .network("ETH")
    .block_range(21000000, 21010000)
    .sender_category("exchange")
    .receiver_category("defi")
    .as_df()
)

# Get AAVE deposits involving exchange addresses
df = (
    client.aave_v3.deposits()
    .network("ETH")
    .block_range(21000000, 21010000)
    .involving_category("exchange")
    .as_df()
)

# Get native ETH transfers FROM Binance or Coinbase (multi-value)
df = (
    client.native_token.transfers()
    .network("ETH")
    .block_range(21000000, 21010000)
    .sender_label("Binance,Coinbase")
    .as_df()
)

Aggregate Queries

Use .aggregate() to bucket raw events into time or block intervals with summary statistics. All existing filters work before .aggregate() is called.

# Aggregate USDT transfers into 2-hour buckets
df = (
    client.erc20.transfers("USDT")
    .network("ETH")
    .block_range(21000000, 21100000)
    .aggregate(group_by="time", period="2h")
    .as_df()
)

# Aggregate by block intervals
df = (
    client.erc20.transfers("USDT")
    .network("ETH")
    .block_range(21000000, 21100000)
    .aggregate(group_by="block", period="100b")
    .as_df()
)

# Combine with filters — large transfers from exchanges, bucketed hourly
df = (
    client.erc20.transfers("USDT")
    .network("ETH")
    .block_range(21000000, 21100000)
    .sender_category("exchange")
    .min_amount(10000)
    .aggregate(group_by="time", period="1h")
    .as_df()
)

# Aggregate Uniswap swaps
df = (
    client.uniswap_v3.swaps("WETH", "USDC", 500)
    .network("ETH")
    .block_range(21000000, 21100000)
    .aggregate(group_by="time", period="1h")
    .as_df()
)

You can also discover what aggregate fields are available for a protocol:

schema = client.aggregate_schema("erc20")
print(schema)

Verbose Mode

By default, responses omit metadata fields to reduce payload size. Use .verbose() to include all fields:

# Default: compact response (no tx_hash, tx_id, log_index, network, name)
df = (
    client.erc20.transfers("USDT")
    .network("ETH")
    .block_range(21000000, 21010000)
    .as_df()
)

# Verbose: includes all metadata fields
df = (
    client.erc20.transfers("USDT")
    .network("ETH")
    .block_range(21000000, 21010000)
    .verbose()
    .as_df()
)

Value Enrichment

Use .with_value() to enrich events with USD value data. This adds a value_usd (amount × price) column to individual events. On aggregate endpoints, it produces an agg_value_usd (sum) column.

Supported protocols: AAVE, Uniswap, Lido, Stader, ERC20, Native Token.

# Individual events with value data
df = (
    client.aave_v3.deposits()
    .network("ETH")
    .block_range(21000000, 21010000)
    .with_value()
    .as_df()
)
# df now includes 'value_usd' column

# Aggregate with value — adds agg_value_usd column
df = (
    client.aave_v3.deposits()
    .network("ETH")
    .block_range(21000000, 21100000)
    .with_value()
    .aggregate(group_by="time", period="2h")
    .as_df()
)
# df now includes 'agg_value_usd' column

Return as DataFrame

# As pandas DataFrame (default)
df = (
    client.erc20.transfers("USDT")
    .network("ETH")
    .block_range(21000000, 21010000)
    .as_df()
)

# As polars DataFrame
df = (
    client.erc20.transfers("USDT")
    .network("ETH")
    .block_range(21000000, 21010000)
    .as_df("polars")
)

Save to File

Format is automatically determined by file extension:

# Save as Parquet (recommended for large datasets)
(
    client.erc20.transfers("USDT")
    .network("ETH")
    .block_range(21000000, 21100000)
    .as_file("transfers.parquet")
)

# Save as CSV
(
    client.erc20.transfers("USDT")
    .network("ETH")
    .block_range(21000000, 21100000)
    .as_file("transfers.csv")
)

# Save as JSON
(
    client.erc20.transfers("USDT")
    .network("ETH")
    .block_range(21000000, 21010000)
    .as_file("transfers.json")
)

Get Download Link

Get a shareable download link instead of the data directly. Useful for passing to other tools or libraries:

from defistream import DeFiStream

client = DeFiStream()

# Get a download link (CSV format by default)
link_info = (
    client.erc20.transfers("USDT")
    .network("ETH")
    .block_range(21000000, 21100000)
    .as_link()
)

print(link_info.filename)  # erc20_transfer_ETH_21000000_21100000.csv
print(link_info.link)      # https://dl.defistream.dev/dh/abc123/...
print(link_info.expiry)    # 2026-02-03 15:30:00
print(link_info.size)      # 1.29 MB

# Get as Parquet link
link_info = (
    client.erc20.transfers("USDT")
    .network("ETH")
    .block_range(21000000, 21100000)
    .as_link(format="parquet")
)

# Use with polars (reads directly from URL)
import polars as pl
df = pl.read_parquet(link_info.link)

# Use with pandas
import pandas as pd
df = pd.read_parquet(link_info.link)

Note: Links expire after 1 hour. The as_link() method only supports csv and parquet formats.

Estimate Query Cost

Preview how many blocks a query will cost before executing it. No quota is deducted.

# Build a query as usual, then call calculate_cost() instead of as_df()
estimate = (
    client.erc20.transfers("USDT")
    .network("ETH")
    .block_range(21000000, 21010000)
    .calculate_cost()
)

print(estimate.cost)                  # 10000
print(estimate.quota_remaining)       # 500000
print(estimate.quota_remaining_after) # 490000

# Also works on aggregate queries
estimate = (
    client.erc20.transfers("USDT")
    .network("ETH")
    .block_range(21000000, 21100000)
    .aggregate(group_by="time", period="1h")
    .calculate_cost()
)

Return as Dictionary (JSON)

For small queries, you can get results as a list of dictionaries:

transfers = (
    client.erc20.transfers("USDT")
    .network("ETH")
    .block_range(21000000, 21010000)
    .as_dict()
)

for transfer in transfers:
    print(f"{transfer['sender']} -> {transfer['receiver']}: {transfer['amount']}")

Note: as_dict() and as_file("*.json") use JSON format which has a 10,000 block limit. For larger block ranges, use as_df() or as_file() with .parquet or .csv extensions, which have no block limit.

Context Manager

Both sync and async clients support context managers to automatically close connections:

# Sync
with DeFiStream() as client:
    df = (
        client.erc20.transfers("USDT")
        .network("ETH")
        .block_range(21000000, 21010000)
        .as_df()
    )

Async Usage

import asyncio
from defistream import AsyncDeFiStream

async def main():
    async with AsyncDeFiStream() as client:
        df = await (
            client.erc20.transfers("USDT")
            .network("ETH")
            .block_range(21000000, 21010000)
            .as_df()
        )
        print(f"Found {len(df)} transfers")

asyncio.run(main())

List Available Decoders

client = DeFiStream()
decoders = client.decoders()
print(decoders)  # ['native_token', 'erc20', 'aave_v3', 'uniswap_v3', 'lido', 'stader', 'threshold']

Configuration

Environment Variables

export DEFISTREAM_API_KEY=dsk_your_api_key
export DEFISTREAM_BASE_URL=https://api.defistream.dev/v1  # optional
from defistream import DeFiStream

# API key from environment
client = DeFiStream()

# Or explicit
client = DeFiStream(api_key="dsk_...", base_url="https://api.defistream.dev/v1")

Timeout and Retries

client = DeFiStream(
    api_key="dsk_...",
    timeout=60.0,  # seconds
    max_retries=3
)

Error Handling

from defistream import DeFiStream
from defistream.exceptions import (
    DeFiStreamError,
    AuthenticationError,
    QuotaExceededError,
    RateLimitError,
    ValidationError
)

client = DeFiStream()

try:
    df = (
        client.erc20.transfers("USDT")
        .network("ETH")
        .block_range(21000000, 21010000)
        .as_df()
    )
except AuthenticationError:
    print("Invalid API key")
except QuotaExceededError as e:
    print(f"Quota exceeded. Remaining: {e.remaining}")
except RateLimitError as e:
    print(f"Rate limited. Retry after: {e.retry_after}s")
except ValidationError as e:
    print(f"Invalid request: {e.message}")
except DeFiStreamError as e:
    print(f"API error: {e}")

Response Headers

Access rate limit and quota information:

df = (
    client.erc20.transfers("USDT")
    .network("ETH")
    .block_range(21000000, 21010000)
    .as_df()
)

# Access response metadata
print(f"Rate limit: {client.last_response.rate_limit}")
print(f"Remaining quota: {client.last_response.quota_remaining}")
print(f"Request cost: {client.last_response.request_cost}")

Builder Methods Reference

Common Methods (all protocols)

Method Description
.network(net) Set network (ETH, ARB, BASE, OP, POLYGON, etc.)
.start_block(n) Set starting block number
.end_block(n) Set ending block number
.block_range(start, end) Set both start and end blocks
.start_time(ts) Set starting time (ISO format or Unix timestamp)
.end_time(ts) Set ending time (ISO format or Unix timestamp)
.time_range(start, end) Set both start and end times
.verbose() Include all metadata fields
.with_value() Enrich events with USD value data (value_usd column)

Protocol-Specific Parameters

Method Protocols Description
.token(*symbols) ERC20 Token symbol(s) (USDT, USDC) or contract address. Accepts multiple known symbols for multi-token queries (multi-value).
.sender(*addrs) ERC20, Native Filter by sender address (multi-value)
.receiver(*addrs) ERC20, Native Filter by receiver address (multi-value)
.involving(*addrs) All Filter by any involved address (multi-value)
.from_address(*addrs) ERC20, Native Alias for .sender()
.to_address(*addrs) ERC20, Native Alias for .receiver()
.min_amount(amt) ERC20, Native Minimum transfer amount
.max_amount(amt) ERC20, Native Maximum transfer amount
.eth_market_type(type) AAVE Market type for ETH: 'Core', 'Prime', 'EtherFi'
.symbol0(sym) Uniswap First token symbol (required)
.symbol1(sym) Uniswap Second token symbol (required)
.fee(tier) Uniswap Fee tier: 100, 500, 3000, 10000 (required)

Address Label & Category Filters

Filter events by entity names or categories using the labels database. Available on all protocols.

Method Protocols Description
.involving_label(label) All Filter where any involved address matches a label substring (e.g., "Binance")
.involving_category(cat) All Filter where any involved address matches a category (e.g., "exchange")
.sender_label(label) ERC20, Native Filter sender by label substring
.sender_category(cat) ERC20, Native Filter sender by category
.receiver_label(label) ERC20, Native Filter receiver by label substring
.receiver_category(cat) ERC20, Native Filter receiver by category

Multi-value support: Pass multiple values as separate arguments (e.g., .sender_label("Binance", "Coinbase")) or as a comma-separated string (e.g., .sender_label("Binance,Coinbase")). Both forms are equivalent.

Mutual exclusivity: Within each slot (involving/sender/receiver), only one of address/label/category can be set. involving* filters cannot be combined with sender*/receiver* filters.

Aggregate Methods

Method Description
.aggregate(group_by, period) Transition to aggregate query. group_by: "time" or "block". period: bucket size (e.g. "1h", "100b"). Returns an AggregateQueryBuilder that supports all the same terminal and filter methods.
client.aggregate_schema(protocol) Get available aggregate fields for a protocol (e.g. "erc20", "aave_v3").

Terminal Methods

Method Description
.as_df() Execute and return pandas DataFrame
.as_df("polars") Execute and return polars DataFrame
.as_file(path) Execute and save to file (format from extension)
.as_file(path, format="csv") Execute and save with explicit format
.as_dict() Execute and return list of dicts (JSON, 10K block limit)
.as_link() Execute and return download link (CSV, 1hr expiry)
.as_link(format="parquet") Execute and return download link (Parquet)
.calculate_cost() Estimate query cost without executing (no quota deducted)

License

MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

defistream-1.6.0.tar.gz (28.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

defistream-1.6.0-py3-none-any.whl (19.5 kB view details)

Uploaded Python 3

File details

Details for the file defistream-1.6.0.tar.gz.

File metadata

  • Download URL: defistream-1.6.0.tar.gz
  • Upload date:
  • Size: 28.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for defistream-1.6.0.tar.gz
Algorithm Hash digest
SHA256 261c8252036e80b227cc69e4fba38c465d259f604abf6352b3fa857ef1b52d91
MD5 97412594de9553d892d2d973194f5869
BLAKE2b-256 acadbb171ab4a736f06ebed11ec951205fc341b8fb216f259e53cec778f203d7

See more details on using hashes here.

File details

Details for the file defistream-1.6.0-py3-none-any.whl.

File metadata

  • Download URL: defistream-1.6.0-py3-none-any.whl
  • Upload date:
  • Size: 19.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for defistream-1.6.0-py3-none-any.whl
Algorithm Hash digest
SHA256 9323f6e8cca0962acc6115d1fa6c8f4b2c8259c3866ae1074eafa9e7c744d08d
MD5 f94c1ee3d0a744afa90776ba910777d6
BLAKE2b-256 445422eef833b0982fadcb377e0ecb869a6b9ca5972d21dfcaa2ce8852b72a02

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page