Skip to main content

No project description provided

Project description


This library is a simple, but flexible / configurable Algorand transaction subscription / indexing mechanism. It allows you to quickly create Python services that follow or subscribe to the Algorand Blockchain.

pip install algokit_subscriber

Documentation

Quick start

import algokit_subscriber as sub
from algokit_utils import AlgorandClient

localnet = AlgorandClient.default_localnet()

# Create subscriber
subscriber = sub.AlgorandSubscriber(
    config=sub.AlgorandSubscriberConfig(
        filters=[
            sub.SubscriberConfigFilter(
                name="filter1",
                type="pay",
                sender="ABC...",
            ),
        ],
        watermark_persistence=sub.in_memory_watermark(),
        sync_behaviour="skip-sync-newest",
    ),
    algod_client=localnet.client.algod,
    indexer_client=localnet.client.indexer,  # only needed when sync_behaviour="catchup-with-indexer"
)

# Set up subscription(s)
def on_filter1(transaction: sub.SubscribedTransaction, filter_name: str) -> None:
    ...

subscriber.on("filter1", on_filter1)

# Either: Start the subscriber (if in long-running process)
subscriber.start()

# OR: Poll the subscriber (if in cron job / periodic lambda)
subscriber.poll_once()

Key features

  • Notification and indexing - You have fine-grained control over the syncing behaviour and can control the number of rounds to sync at a time, the pattern of syncing i.e. start from the beginning of the chain, or start from the tip; drop stale records if your service can't keep up or keep syncing from where you are up to; etc.
  • Low latency processing - When your service has caught up to the tip of the chain it can optionally wait for new rounds so you have a low latency reaction to a new round occurring
  • Watermarking and resilience - You can create reliable syncing / indexing services through a simple round watermarking capability that allows you to create resilient syncing services that can recover from an outage
  • Extensive subscription filtering - You can filter by transaction type, sender, receiver, note prefix, apps (ID, creation, on complete, ARC-4 method signature, call arguments, ARC-28 events), assets (ID, creation, amount transferred range), transfers (amount transferred range) and balance changes (algo and assets)
  • ARC-28 event subscription support - You can subscribe to ARC-28 events for a smart contract
  • Balance change support - Subscribed transactions will have all algo and asset balance changes calculated for you and you can also subscribe to balance changes that meet certain criteria
  • First-class inner transaction support - Your filter will find arbitrarily nested inner transactions and return that transaction (indexer can't do this!)
  • State-proof support - You can subscribe to state proof transactions
  • Simple programming model - It's really easy to use and consume through easy to use, type-safe methods and objects and subscribed transactions have a comprehensive and familiar model type with all relevant/useful information about that transaction (including things like transaction id, round number, created asset/app id, app logs, etc.) modelled on the indexer data model (which is used regardless of whether the transactions come from indexer or algod so it's a consistent experience)
  • Easy to deploy - You have full control over how you want to deploy and use the subscriber; it will work with whatever persistence (e.g. sql, no-sql, etc.), queuing/messaging (e.g. queues, topics, buses, web hooks, web sockets) and compute (e.g. serverless periodic lambdas, continually running containers, virtual machines, etc.) services you want to use
  • Fast initial index - There is an indexer catch up mode that allows you to use indexer to catch up to the tip of the chain in seconds or minutes rather than days; alternatively, if you prefer to just use algod and not indexer that option is available too!

Balance change notes

The balance change semantics work mostly as expected, however the semantics around asset creation and destruction warrants further clarification.

When an asset is created, the full asset supply is attributed to the asset creators account.

The balance change for an asset create transaction will be as below:

import algokit_subscriber as sub

sub.BalanceChange(
    address="VIDHG4SYANCP2GUQXXSFSNBPJWS4TAQSI3GH4GYO54FSYPDIBYPMSF7HBY",  # The asset creator
    asset_id=2391,  # The created asset id
    amount=100000,  # Full asset supply of the created asset
    roles=[sub.BalanceChangeRole.AssetCreator],
)

When an asset is destroyed, the full asset supply must be in the asset creators account and the asset manager must send the destroy transaction. Unfortunately we cannot determine the asset creator or full asset supply from the transaction data. As a result the balance change will always be attributed to the asset manager and will have a 0 amount. If you need to account for the asset supply being destroyed from the creators account, you'll need to handle this separately.

The balance change for an asset destroy transaction will be as below:

import algokit_subscriber as sub

sub.BalanceChange(
    address="PIDHG4SYANCP2GUQXXSFSNBPJWS4TAQSI3GH4GYO54FSYPDIBYPMSF7HBY",  # The asset destroyer, which will always be the asset manager
    asset_id=2391,  # The destroyed asset id
    amount=0,  # This value will always be 0
    roles=[sub.BalanceChangeRole.AssetDestroyer],
)

Examples

Data History Museum index

The following code, when algod is pointed to TestNet, will find all transactions emitted by the Data History Museum since the beginning of time in seconds and then find them in real-time as they emerge on the chain.

The watermark is stored in-memory so this particular example is not resilient to restarts. To change that you can implement proper persistence of the watermark.

import algokit_subscriber as sub
from algokit_utils import AlgorandClient

algorand = AlgorandClient.testnet()


subscriber = sub.AlgorandSubscriber(
    config=sub.AlgorandSubscriberConfig(
        filters=[
            # Match asset configuration transactions from DHM creator account
            sub.SubscriberConfigFilter(
                name="dhm-asset",
                type="acfg",
                sender="ER7AMZRPD5KDVFWTUUVOADSOWM4RQKEEV2EDYRVSA757UHXOIEKGMBQIVU",
            ),
        ],
        frequency_in_seconds=5,
        max_rounds_to_sync=100,
        sync_behaviour="catchup-with-indexer",
        watermark_persistence=sub.in_memory_watermark(),
    ),
    algod_client=algorand.client.algod,
    indexer_client=algorand.client.indexer,
)


def process_dhm_assets(transactions: list[sub.SubscribedTransaction], filter_name: str) -> None:
    print(f"Received {len(transactions)} asset changes")
    # ... do stuff with the transactions


# Attach our callback to the 'dhm-asset' filter
subscriber.on_batch("dhm-asset", process_dhm_assets)


def handle_error(error: Exception) -> None:
    print(f"An error occurred: {error}")


# Attach the error handler
subscriber.on_error(handle_error)

# Start the subscriber
subscriber.start()

USDC real-time monitoring

The following code, when algod is pointed to MainNet, will find all transfers of USDC that are greater than $1 and it will poll every 1s for new transfers.

import algokit_subscriber as sub
from algokit_utils import AlgorandClient

algorand = AlgorandClient.mainnet()

subscriber = sub.AlgorandSubscriber(
    config=sub.AlgorandSubscriberConfig(
        filters=[
            sub.SubscriberConfigFilter(
                name="usdc",
                type="axfer",
                asset_id=31566704,  # MainNet: USDC
                min_amount=1_000_000,  # $1
            ),
        ],
        wait_for_block_when_at_tip=True,
        sync_behaviour="skip-sync-newest",
        watermark_persistence=sub.in_memory_watermark(),
    ),
    algod_client=algorand.client.algod,
)


def process_usdc_transfer(transfer: sub.SubscribedTransaction, filter_name: str) -> None:
    axfer = transfer.asset_transfer_transaction
    if axfer is None:
        return
    amount = axfer.amount / 1_000_000
    print(f"{transfer.sender} sent {axfer.receiver} USDC${amount:.2f} in transaction {transfer.id_}")


# Attach our callback to the 'usdc' filter
subscriber.on("usdc", process_usdc_transfer)


def handle_error(error: Exception) -> None:
    print(f"An error occurred: {error}")


# Attach the error handler
subscriber.on_error(handle_error)

# Start the subscriber
subscriber.start()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

algokit_subscriber-2.0.0b1.tar.gz (446.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

algokit_subscriber-2.0.0b1-py3-none-any.whl (30.8 kB view details)

Uploaded Python 3

File details

Details for the file algokit_subscriber-2.0.0b1.tar.gz.

File metadata

  • Download URL: algokit_subscriber-2.0.0b1.tar.gz
  • Upload date:
  • Size: 446.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for algokit_subscriber-2.0.0b1.tar.gz
Algorithm Hash digest
SHA256 9577faac513f08b63e0e0b4352941b1270d9afdf8968b16f83b61c8154a7042f
MD5 0741f7f4e12c30654ccc33b9fbfd607e
BLAKE2b-256 6e9760924d91100ac900ad5406d8bd408685042debe83443df52cebafc90334b

See more details on using hashes here.

Provenance

The following attestation bundles were made for algokit_subscriber-2.0.0b1.tar.gz:

Publisher: cd.yaml on algorandfoundation/algokit-subscriber-py

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file algokit_subscriber-2.0.0b1-py3-none-any.whl.

File metadata

File hashes

Hashes for algokit_subscriber-2.0.0b1-py3-none-any.whl
Algorithm Hash digest
SHA256 1585af6d966bd2055d21d8c83308844b40e58dd236904d22f21d9f44940a9199
MD5 ac89ee1dd20df0b5d5a35f4122aa31af
BLAKE2b-256 bf5ddf24b83dc8a0fae63104d81833c98f955b125eaba6df78cdeae6d45dafa8

See more details on using hashes here.

Provenance

The following attestation bundles were made for algokit_subscriber-2.0.0b1-py3-none-any.whl:

Publisher: cd.yaml on algorandfoundation/algokit-subscriber-py

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page