Skip to main content

Pythonic DCP(Bifrost2) Wrapper

Project description

PDCP - Pythonic DCP Wrapper

PDCP is a Python package that provides a more Pythonic interface to the DCP Python SDK (Bifrost2). It simplifies the interaction with DCP services by providing a clean, type-hinted interface for job management and computation.

Features

  • Type-Safe Configuration: Full type hints using TypedDict for job configuration
  • Event Handling: Flexible event subscription system for job monitoring
  • Job Chaining: Basic workflow management through job chaining
  • Compute Groups: Support for DCP compute group management
  • Streaming Slices: Optional streaming support for job slices

Requirements

PDCP requires a python version between [3.10, 4.0) and the latest node version.

You also need to have your keystore files located in your home directory as shown below for dcp to identify you.

└── %USERPROFILE%/.dcp
    ├── default.keystore
    └── id.keystore

Installation

pip install pdcp

Quick Start

from pdcp import Job, dcp
from pdcp.custom_types import JobConfig

# Define your work function
def work(x, a):
    dcp.progress()
    return x * a

# Configure your job
config: JobConfig = {   
    "name": "test",
    "work_function": work,
    "slices": [1, 2, 3],
    "constant_params": [3],
    "compute_groups": [{"joinKey": "group_name", "joinSecret": "secret"}]
}

# Create and configure the job
job = Job(config)

# Subscribe to job events
job.subscribe_to({
    "readystatechange": print,
    "accepted": lambda _: print(f'accepted: {job.id}'),
    "result": lambda e: print(f'result: {e.result}'),
    "status": lambda e: print(f'status: {e.runStatus}')
})

# Execute the job
job.dispatch()
results = job.get_results()
print(results)

Job Configuration

The JobConfig TypedDict provides a type-safe way to configure jobs:

class JobConfig(TypedDict):
    name: str                                           # Name of the job
    work_function: callable                             # Function to be executed
    slices: NotRequired[list]                           # Input data slices
    stream_slices: NotRequired[bool]                    # Enable/disable slice streaming
    constant_params: NotRequired[list[any]]             # Constant parameters for work function
    compute_groups: NotRequired[list[ComputeGroup]]     # DCP compute groups
    job_dependencies: NotRequired[list[str]]            # Required local dependencies

Event Handling

Jobs support various events that can be subscribed to:

from pdcp.types import EventHandler

events: EventHandler = {
    "readystatechange": handler,    # Job state changes
    "accepted": handler,            # Job accepted by DCP
    "result": handler,              # New result available
    "complete": handler,            # Job completed
    "console": handler,             # Console output
    "status": handler               # Status updates
}

job.subscribe_to(events)

Job Chaining

Basic job chaining is supported through the chain method:

# Create and chain jobs
job1 = Job(config1)
job2 = job1.chain(config2)

job1.dispatch()

Compute Groups

Support for DCP compute groups:

config: JobConfig = {
    "name": "my_job",
    "work_function": work,
    "compute_groups": [
        {
            "joinKey": "group_name",
            "joinSecret": "secret"
        }
    ]
}

Streaming Slices

Enable streaming slices for continuous data processing:

config: JobConfig = {
    "name": "streaming_job",
    "work_function": work,
    "stream_slices": True,  # Enable streaming
    "slices": initial_slices
}

job = Job(config)
# Add more slices later
job.add_slices(new_slices)

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pdcp-1.2.0.tar.gz (4.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pdcp-1.2.0-py3-none-any.whl (5.8 kB view details)

Uploaded Python 3

File details

Details for the file pdcp-1.2.0.tar.gz.

File metadata

  • Download URL: pdcp-1.2.0.tar.gz
  • Upload date:
  • Size: 4.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.3 CPython/3.11.12 Linux/6.11.0-1014-azure

File hashes

Hashes for pdcp-1.2.0.tar.gz
Algorithm Hash digest
SHA256 0669ea367ced6660593a306ba1819231929212210d2d679f89574575feaa4826
MD5 95ff9c3ead1b9df52f26e12b0b7468e0
BLAKE2b-256 5d25319045d0ef4a3053ed96ae93fdee5c2c944d9e85cfc6545566865c9d94e5

See more details on using hashes here.

File details

Details for the file pdcp-1.2.0-py3-none-any.whl.

File metadata

  • Download URL: pdcp-1.2.0-py3-none-any.whl
  • Upload date:
  • Size: 5.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.3 CPython/3.11.12 Linux/6.11.0-1014-azure

File hashes

Hashes for pdcp-1.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 710aec694f8e971d823dbb64b0f39c73c1cab06223a05e6f2d3ee84efcf244b9
MD5 ee112391acb05dab5d96a37c71b3251d
BLAKE2b-256 af5af3db7d3ac65b3849f71a1c3c9c2938a2fd6457e1c7ddb37841a99d0488ce

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page