Shared constants and utilities for ChainSwarm blockchain analytics projects
Project description
chainswarm-core
Shared constants and utilities for ChainSwarm blockchain analytics projects.
Overview
chainswarm-core provides a single source of truth for:
- Blockchain network definitions - Network types, block times, native assets
- Address classifications - Address types, trust levels, risk mappings
- Pattern detection constants - Pattern types, detection methods, role classifications
- Database utilities - ClickHouse repository base class and row conversion utilities
- Observability - Logging, metrics (Prometheus), and graceful shutdown
- Celery jobs - Celery app factory, base task class, loguru integration
This package eliminates code duplication across ChainSwarm projects including:
data-pipelinechain-syntheticsanalytics-pipelineml-pipelinebenchmarkrisk-scoring
Installation
pip install chainswarm-core
For development:
pip install chainswarm-core[dev]
Quick Start
from chainswarm_core import (
AddressTypes,
Network,
NetworkType,
RiskLevels,
TrustLevels,
)
# Check network type
if Network.get_node_type("polkadot") == NetworkType.SUBSTRATE:
print("Polkadot is a Substrate network")
# Get block time
block_time = Network.get_block_time("bitcoin") # Returns 600 seconds
# Check address risk
from chainswarm_core.constants import get_address_type_risk_level, is_high_risk_address_type
risk = get_address_type_risk_level(AddressTypes.MIXER) # Returns "critical"
is_risky = is_high_risk_address_type(AddressTypes.GAMBLING) # Returns True
Modules
chainswarm_core.constants.networks
Network type classifications and blockchain network enum.
from chainswarm_core.constants.networks import (
NetworkType, # SUBSTRATE, EVM, UTXO
Network, # Enum of supported networks
substrate_networks,
evm_networks,
utxo_networks,
)
# Get native asset symbol
symbol = Network.get_native_asset_symbol("bittensor") # Returns "TAO"
chainswarm_core.constants.addresses
Address type and trust level classifications.
from chainswarm_core.constants.addresses import (
AddressTypes, # EXCHANGE, DEX, MIXER, VALIDATOR, etc.
TrustLevels, # VERIFIED, COMMUNITY, OFFICIAL, etc.
is_high_risk_address_type,
is_trusted_address_type,
)
chainswarm_core.constants.risk
Risk levels, severities, and risk mappings.
from chainswarm_core.constants.risk import (
RiskLevels, # LOW, MEDIUM, HIGH, CRITICAL
Severities, # LOW, MEDIUM, HIGH, CRITICAL
ADDRESS_TYPE_RISK_MAP,
AddressSubtypeRiskModifiers,
get_address_type_risk_level,
get_subtype_risk_modifier,
)
# Get risk level for address type
risk = get_address_type_risk_level(AddressTypes.SCAM) # Returns "critical"
# Get risk modifier for subtype
modifier = get_subtype_risk_modifier("uniswap_v3") # Returns 0.8
chainswarm_core.constants.patterns
Pattern detection types and role classifications.
from chainswarm_core.constants.patterns import (
PatternTypes, # CYCLE, LAYERING_PATH, SMURFING_NETWORK, etc.
DetectionMethods, # SCC_ANALYSIS, CYCLE_DETECTION, etc.
PatternRoles, # ATTACKER, MULE, HOT_WALLET, etc.
MALICIOUS_ROLES,
VICTIM_ROLES,
BENIGN_ROLES,
is_malicious_role,
is_victim_role,
is_benign_role,
)
chainswarm_core.db
ClickHouse database utilities.
from chainswarm_core.db import (
BaseRepository,
row_to_dict,
convert_clickhouse_enum,
clickhouse_row_to_pydantic,
rows_to_pydantic_list,
)
# Create a repository
class MyRepository(BaseRepository):
@classmethod
def schema(cls) -> str:
return "my_table.sql"
@classmethod
def table_name(cls) -> str:
return "my_table"
# Convert rows to Pydantic models
from pydantic import BaseModel
class MyModel(BaseModel):
id: int
name: str
rows = [(1, "first"), (2, "second")]
columns = ["id", "name"]
models = rows_to_pydantic_list(MyModel, rows, columns)
chainswarm_core.observability
Unified logging, metrics, and shutdown handling.
Logging
from chainswarm_core.observability import (
setup_logger,
generate_correlation_id,
get_correlation_id,
set_correlation_id,
)
setup_logger("my-service")
correlation_id = generate_correlation_id()
set_correlation_id(correlation_id)
from loguru import logger
logger.info("Processing request")
Graceful Shutdown
from chainswarm_core.observability import (
terminate_event,
install_shutdown_handlers,
)
install_shutdown_handlers()
while not terminate_event.is_set():
process_batch()
Prometheus Metrics
from chainswarm_core.observability import (
setup_metrics,
get_metrics_registry,
MetricsRegistry,
DURATION_BUCKETS,
)
PORT_MAPPING = {
"my-service-indexer": 9101,
"my-service-api": 9200,
}
metrics = setup_metrics("my-service-indexer", port_mapping=PORT_MAPPING)
blocks_counter = metrics.create_counter(
"blocks_processed_total",
"Total blocks processed",
labelnames=["network"]
)
blocks_counter.labels(network="torus").inc()
processing_time = metrics.create_histogram(
"block_processing_seconds",
"Block processing duration",
buckets=DURATION_BUCKETS
)
with processing_time.time():
process_block()
Metrics Decorator
from chainswarm_core.observability import manage_metrics
@manage_metrics(success_metric_name="task_success", failure_metric_name="task_failure")
def run_task():
pass
chainswarm_core.jobs
Celery infrastructure with loguru integration and JSON beat schedule loading.
Create Celery App
from chainswarm_core.jobs import create_celery_app
celery_app = create_celery_app(
name="my-service-jobs",
autodiscover=["packages.jobs.tasks"],
beat_schedule_path="packages/jobs/beat_schedule.json",
)
Define Tasks
from chainswarm_core.jobs import BaseTask, BaseTaskContext, BaseTaskResult
from typing import Any, Dict
class MyTask(BaseTask):
name = "my_task"
def execute_task(self, context: Dict[str, Any]) -> Dict[str, Any]:
ctx = BaseTaskContext(**context)
return BaseTaskResult(
network=ctx.network,
status="success",
processing_date=ctx.processing_date,
).__dict__
my_task = celery_app.register_task(MyTask())
Beat Schedule JSON
{
"my-task-every-hour": {
"task": "packages.jobs.tasks.my_task",
"schedule": "0 * * * *",
"args": [{"network": "torus", "processing_date": "2024-01-01"}]
}
}
Cron strings are automatically converted to crontab() objects.
Development Worker
from chainswarm_core.jobs import create_celery_app, run_dev_worker
celery_app = create_celery_app("my-service", ["packages.jobs.tasks"])
if __name__ == "__main__":
run_dev_worker(celery_app)
Migration Guide
From project-local constants
Before:
from packages.storage.constants import AddressTypes, RiskLevels
from packages.storage.repositories.base_repository import BaseRepository
After:
from chainswarm_core import AddressTypes, RiskLevels, BaseRepository
From project-local repository utils
Before:
from packages.storage.repositories.utils import row_to_dict
After:
from chainswarm_core.db import row_to_dict
Development
Setup
# Clone the repository
git clone https://github.com/chainswarm/core.git
cd core
# Install with dev dependencies
pip install -e ".[dev]"
# Run tests
pytest tests/ -v
Running Tests
# All tests
pytest
# With coverage
pytest --cov=chainswarm_core --cov-report=html
# Specific module
pytest tests/test_constants/test_networks.py -v
CI/CD
-
CI: Runs on every push and PR to
main- Tests on Python 3.13
-
Publish: Manual workflow dispatch to publish to PyPI
- Requires version match in
pyproject.toml - Creates GitHub release with tag
- Requires version match in
Contributing
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Make your changes
- Run tests
- Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
License
This project is licensed under the Apache License 2.0 - see the LICENSE file for details.
Links
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file chainswarm_core-0.1.8.tar.gz.
File metadata
- Download URL: chainswarm_core-0.1.8.tar.gz
- Upload date:
- Size: 27.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6513166170001787379a09e53c5193835c41ae55f36f08c7733ef181f6aa1380
|
|
| MD5 |
c5bd0b71f03d725407c44246982221a5
|
|
| BLAKE2b-256 |
bcf0ae18c2f869038e23f0470b778bbb4d341696aa886bfcb2f02631d55a30ed
|
File details
Details for the file chainswarm_core-0.1.8-py3-none-any.whl.
File metadata
- Download URL: chainswarm_core-0.1.8-py3-none-any.whl
- Upload date:
- Size: 35.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
306f3685dca70cbb9eebfa0440a2c783dfa5fc8dfc371c2f4a0f3435cecab0ad
|
|
| MD5 |
41729c5747d6beb6ff68ce38b68efe0b
|
|
| BLAKE2b-256 |
c3f4ffbe00efc23dd083f190db0fe8a0065ba7b913ad07ddb1bb88b8c443d23b
|