Skip to main content

Shared constants and utilities for ChainSwarm blockchain analytics projects

Project description

chainswarm-core

CI PyPI version Python 3.13+ License

Shared constants and utilities for ChainSwarm blockchain analytics projects.

Overview

chainswarm-core provides a single source of truth for:

  • Blockchain network definitions - Network types, block times, native assets
  • Address classifications - Address types, trust levels, risk mappings
  • Pattern detection constants - Pattern types, detection methods, role classifications
  • Database utilities - ClickHouse repository base class and row conversion utilities
  • Observability - Logging, metrics (Prometheus), and graceful shutdown
  • Celery jobs - Celery app factory, base task class, loguru integration

This package eliminates code duplication across ChainSwarm projects including:

  • data-pipeline
  • chain-synthetics
  • analytics-pipeline
  • ml-pipeline
  • benchmark
  • risk-scoring

Installation

pip install chainswarm-core

For development:

pip install chainswarm-core[dev]

Quick Start

from chainswarm_core import (
    AddressTypes,
    Network,
    NetworkType,
    RiskLevels,
    TrustLevels,
)

# Check network type
if Network.get_node_type("polkadot") == NetworkType.SUBSTRATE:
    print("Polkadot is a Substrate network")

# Get block time
block_time = Network.get_block_time("bitcoin")  # Returns 600 seconds

# Check address risk
from chainswarm_core.constants import get_address_type_risk_level, is_high_risk_address_type

risk = get_address_type_risk_level(AddressTypes.MIXER)  # Returns "critical"
is_risky = is_high_risk_address_type(AddressTypes.GAMBLING)  # Returns True

Modules

chainswarm_core.constants.networks

Network type classifications and blockchain network enum.

from chainswarm_core.constants.networks import (
    NetworkType,      # SUBSTRATE, EVM, UTXO
    Network,          # Enum of supported networks
    substrate_networks,
    evm_networks,
    utxo_networks,
)

# Get native asset symbol
symbol = Network.get_native_asset_symbol("bittensor")  # Returns "TAO"

chainswarm_core.constants.addresses

Address type and trust level classifications.

from chainswarm_core.constants.addresses import (
    AddressTypes,     # EXCHANGE, DEX, MIXER, VALIDATOR, etc.
    TrustLevels,      # VERIFIED, COMMUNITY, OFFICIAL, etc.
    is_high_risk_address_type,
    is_trusted_address_type,
)

chainswarm_core.constants.risk

Risk levels, severities, and risk mappings.

from chainswarm_core.constants.risk import (
    RiskLevels,       # LOW, MEDIUM, HIGH, CRITICAL
    Severities,       # LOW, MEDIUM, HIGH, CRITICAL
    ADDRESS_TYPE_RISK_MAP,
    AddressSubtypeRiskModifiers,
    get_address_type_risk_level,
    get_subtype_risk_modifier,
)

# Get risk level for address type
risk = get_address_type_risk_level(AddressTypes.SCAM)  # Returns "critical"

# Get risk modifier for subtype
modifier = get_subtype_risk_modifier("uniswap_v3")  # Returns 0.8

chainswarm_core.constants.patterns

Pattern detection types and role classifications.

from chainswarm_core.constants.patterns import (
    PatternTypes,     # CYCLE, LAYERING_PATH, SMURFING_NETWORK, etc.
    DetectionMethods, # SCC_ANALYSIS, CYCLE_DETECTION, etc.
    PatternRoles,     # ATTACKER, MULE, HOT_WALLET, etc.
    MALICIOUS_ROLES,
    VICTIM_ROLES,
    BENIGN_ROLES,
    is_malicious_role,
    is_victim_role,
    is_benign_role,
)

chainswarm_core.db

ClickHouse database utilities.

from chainswarm_core.db import (
    BaseRepository,
    row_to_dict,
    convert_clickhouse_enum,
    clickhouse_row_to_pydantic,
    rows_to_pydantic_list,
)

# Create a repository
class MyRepository(BaseRepository):
    @classmethod
    def schema(cls) -> str:
        return "my_table.sql"
    
    @classmethod
    def table_name(cls) -> str:
        return "my_table"

# Convert rows to Pydantic models
from pydantic import BaseModel

class MyModel(BaseModel):
    id: int
    name: str

rows = [(1, "first"), (2, "second")]
columns = ["id", "name"]
models = rows_to_pydantic_list(MyModel, rows, columns)

chainswarm_core.observability

Unified logging, metrics, and shutdown handling.

Logging

from chainswarm_core.observability import (
    setup_logger,
    generate_correlation_id,
    get_correlation_id,
    set_correlation_id,
)

setup_logger("my-service")

correlation_id = generate_correlation_id()
set_correlation_id(correlation_id)

from loguru import logger
logger.info("Processing request")

Graceful Shutdown

from chainswarm_core.observability import (
    terminate_event,
    install_shutdown_handlers,
)

install_shutdown_handlers()

while not terminate_event.is_set():
    process_batch()

Prometheus Metrics

from chainswarm_core.observability import (
    setup_metrics,
    get_metrics_registry,
    MetricsRegistry,
    DURATION_BUCKETS,
)

PORT_MAPPING = {
    "my-service-indexer": 9101,
    "my-service-api": 9200,
}

metrics = setup_metrics("my-service-indexer", port_mapping=PORT_MAPPING)

blocks_counter = metrics.create_counter(
    "blocks_processed_total",
    "Total blocks processed",
    labelnames=["network"]
)
blocks_counter.labels(network="torus").inc()

processing_time = metrics.create_histogram(
    "block_processing_seconds",
    "Block processing duration",
    buckets=DURATION_BUCKETS
)
with processing_time.time():
    process_block()

Metrics Decorator

from chainswarm_core.observability import manage_metrics

@manage_metrics(success_metric_name="task_success", failure_metric_name="task_failure")
def run_task():
    pass

chainswarm_core.jobs

Celery infrastructure with loguru integration and JSON beat schedule loading.

Create Celery App

from chainswarm_core.jobs import create_celery_app

celery_app = create_celery_app(
    name="my-service-jobs",
    autodiscover=["packages.jobs.tasks"],
    beat_schedule_path="packages/jobs/beat_schedule.json",
)

Define Tasks

from chainswarm_core.jobs import BaseTask, BaseTaskContext, BaseTaskResult
from typing import Any, Dict

class MyTask(BaseTask):
    name = "my_task"

    def execute_task(self, context: Dict[str, Any]) -> Dict[str, Any]:
        ctx = BaseTaskContext(**context)
        return BaseTaskResult(
            network=ctx.network,
            status="success",
            processing_date=ctx.processing_date,
        ).__dict__

my_task = celery_app.register_task(MyTask())

Beat Schedule JSON

{
  "my-task-every-hour": {
    "task": "packages.jobs.tasks.my_task",
    "schedule": "0 * * * *",
    "args": [{"network": "torus", "processing_date": "2024-01-01"}]
  }
}

Cron strings are automatically converted to crontab() objects.

Development Worker

from chainswarm_core.jobs import create_celery_app, run_dev_worker

celery_app = create_celery_app("my-service", ["packages.jobs.tasks"])

if __name__ == "__main__":
    run_dev_worker(celery_app)

Migration Guide

From project-local constants

Before:

from packages.storage.constants import AddressTypes, RiskLevels
from packages.storage.repositories.base_repository import BaseRepository

After:

from chainswarm_core import AddressTypes, RiskLevels, BaseRepository

From project-local repository utils

Before:

from packages.storage.repositories.utils import row_to_dict

After:

from chainswarm_core.db import row_to_dict

Development

Setup

# Clone the repository
git clone https://github.com/chainswarm/core.git
cd core

# Install with dev dependencies
pip install -e ".[dev]"

# Run tests
pytest tests/ -v

Running Tests

# All tests
pytest

# With coverage
pytest --cov=chainswarm_core --cov-report=html

# Specific module
pytest tests/test_constants/test_networks.py -v

CI/CD

  • CI: Runs on every push and PR to main

    • Tests on Python 3.13
  • Publish: Manual workflow dispatch to publish to PyPI

    • Requires version match in pyproject.toml
    • Creates GitHub release with tag

Contributing

  1. Fork the repository
  2. Create a feature branch (git checkout -b feature/amazing-feature)
  3. Make your changes
  4. Run tests
  5. Commit your changes (git commit -m 'Add amazing feature')
  6. Push to the branch (git push origin feature/amazing-feature)
  7. Open a Pull Request

License

This project is licensed under the Apache License 2.0 - see the LICENSE file for details.

Links

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

chainswarm_core-0.1.10.tar.gz (26.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

chainswarm_core-0.1.10-py3-none-any.whl (32.3 kB view details)

Uploaded Python 3

File details

Details for the file chainswarm_core-0.1.10.tar.gz.

File metadata

  • Download URL: chainswarm_core-0.1.10.tar.gz
  • Upload date:
  • Size: 26.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for chainswarm_core-0.1.10.tar.gz
Algorithm Hash digest
SHA256 5170ceeff6d9ddd2fa7f11a4b237fc8e704865e68cc47dc4086ce8926bebc2af
MD5 b72480ae3a55d6edbeed419c73868452
BLAKE2b-256 d24a1f8c907d1f69d8ca6ae344b1b97508a0123e4ca718fccbdaf2261cf6b8b5

See more details on using hashes here.

File details

Details for the file chainswarm_core-0.1.10-py3-none-any.whl.

File metadata

File hashes

Hashes for chainswarm_core-0.1.10-py3-none-any.whl
Algorithm Hash digest
SHA256 d5c117ee2813663a0036a6c81090c4397f95a35ce447fe4925e0a1a91a75f879
MD5 3e603dcec2bfe3be4fbf814ca71758ef
BLAKE2b-256 fd9f6d8a70a20c0868caf18d1544ec523d8f412e2d88bb7d59007e4163fb0a26

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page