Skip to main content

Good Kiwi Common Library

Project description

good-common

A small set of common dependencies for Good Kiwi.

Dependency Provider

BaseProvider is a base class for creating fast_depends (so FastAPI and FastStream compatible) dependency providers.

class APIClient:
    def __init__(self, api_key: str):
        self.api_key = api_key

    def get(self, url: str):
        return f"GET {url} with {self.api_key}"

class APIClientProvider(BaseProvider[APIClient], APIClient):
    pass


from fast_depends import inject

@inject
def some_task(
    api_client: APIClient = APIClientProvider(api_key="1234"),
):
    return api_client.get("https://example.com")

Can also be used without fast_depends:

client = APIClientProvider(api_key="1234").get()

Override initializer to customize how the dependency class is initialized.

class APIClientProvider(BaseProvider[APIClient], APIClient):
    def initializer(
        self,
        cls_args: typing.Tuple[typing.Any, ...],  # args passed to the Provider
        cls_kwargs: typing.Dict[str, typing.Any],  # kwargs passed to the Provider
        fn_kwargs: typing.Dict[str, typing.Any],  # kwargs passed to the function at runtime
    ):
        return cls_args, {**cls_kwargs, **fn_kwargs}  # override the api_key with the one passed to the function


@inject
def some_task(
    api_key: str,
    api_client: APIClient = APIClientProvider(),
):
    return api_client.get("https://example.com")


some_task(api_key="5678")

Pipeline

Overview

The Pipeline library provides a flexible and efficient way to create and execute pipelines of components in Python. It supports both synchronous and asynchronous execution, type checking, parallel processing, and error handling.

Features

  • Create pipelines with multiple components that can accept multiple inputs and produce multiple outputs
  • Typed "channels" for passing data between components
  • Support for both synchronous and asynchronous components
  • Type checking for inputs and outputs using Python type annotations
  • Parallel execution of pipeline instances
  • Error handling with Result types
  • Function mapping for flexible component integration

Quick Start

from typing import Annotated
from good_common.pipeline import Pipeline, Attribute

def add(a: int, b: int) -> Annotated[int, Attribute("result")]:
    return a + b

def multiply(result: int, factor: int) -> Annotated[int, Attribute("result")]:
    return result * factor

# Create a pipeline
my_pipeline = Pipeline(add, multiply)

# Execute the pipeline
result = await my_pipeline(a=2, b=3, factor=4)
print(result.result)  # Output: 20

Usage

Creating a Pipeline

Use the Pipeline class to create a new pipeline:

from pipeline import Pipeline

my_pipeline = Pipeline(component1, component2, component3)

Defining Components

Components can be synchronous or asynchronous functions:

from typing import Annotated
from pipeline import Attribute

def sync_component(x: int) -> Annotated[int, Attribute("result")]:
    return x + 1

async def async_component(x: int) -> Annotated[int, Attribute("result")]:
    await asyncio.sleep(0.1)
    return x * 2

Executing a Pipeline

Execute a pipeline asynchronously:

result = await my_pipeline(x=5)
print(result.result)

Parallel Execution

Execute a pipeline with multiple inputs in parallel:

inputs = [{"a": 1, "b": 2, "factor": 2}, {"a": 2, "b": 3, "factor": 3}]
results = [result async for result in my_pipeline.execute(*inputs, max_workers=3)]

for result in results:
    if result.is_ok():
        print(result.unwrap().result)
    else:
        print(f"Error: {result.unwrap_err()}")

Error Handling

The pipeline handles errors gracefully in parallel execution:

def faulty_component(x: int) -> Annotated[int, Attribute("result")]:
    if x == 2:
        raise ValueError("Error on purpose!")
    return x + 1

pipeline = Pipeline(faulty_component)
inputs = [{"x": 1}, {"x": 2}, {"x": 3}]
results = [result async for result in pipeline.execute(*inputs)]

for result in results:
    if result.is_ok():
        print(result.unwrap().result)
    else:
        print(f"Error: {result.unwrap_err()}")

Function Mapping

Use function_mapper to adjust input parameter names:

from pipeline import function_mapper

def multiply_diff(difference: int, factor: int) -> Annotated[int, Attribute("result")]:
    return difference * factor

pipeline = Pipeline(subtract, function_mapper(multiply_diff, diff="difference"))

Advanced Features

  • Mixed synchronous and asynchronous components in a single pipeline
  • Custom output types with Attribute annotations
  • Flexible error handling in both single and parallel executions

Utilities

Various utility functions for common tasks.

Look at /tests/good_common/utilities for usage

Project details


Release history Release notifications | RSS feed

This version

0.6.2

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

good_common-0.6.2-cp313-cp313-win_amd64.whl (503.3 kB view details)

Uploaded CPython 3.13Windows x86-64

good_common-0.6.2-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl (1.3 MB view details)

Uploaded CPython 3.13manylinux: glibc 2.17+ x86-64manylinux: glibc 2.28+ x86-64

good_common-0.6.2-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl (1.3 MB view details)

Uploaded CPython 3.13manylinux: glibc 2.17+ ARM64manylinux: glibc 2.28+ ARM64

good_common-0.6.2-cp313-cp313-macosx_10_13_universal2.whl (659.5 kB view details)

Uploaded CPython 3.13macOS 10.13+ universal2 (ARM64, x86-64)

File details

Details for the file good_common-0.6.2-cp313-cp313-win_amd64.whl.

File metadata

File hashes

Hashes for good_common-0.6.2-cp313-cp313-win_amd64.whl
Algorithm Hash digest
SHA256 f3b58360d43502756f7c638880adb195b48b6a240282641ffdfa99b51ec5c66b
MD5 5665c5b1e2a3deb7ebd36912cfa8414f
BLAKE2b-256 05c88f87657f6eca60c8aeb2ba4759a7d7d0480a10525eb6a3928df62e806892

See more details on using hashes here.

File details

Details for the file good_common-0.6.2-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl.

File metadata

File hashes

Hashes for good_common-0.6.2-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl
Algorithm Hash digest
SHA256 889374e669dc4f8159f9216041a339958fa60c3d29da041b545b82d6ce6bdac0
MD5 d49927f30a54309c79862d549a97868c
BLAKE2b-256 8241ddfd50c7970d4de47332837aca4a8feb67abeff34a6395301a06d3cfdee8

See more details on using hashes here.

File details

Details for the file good_common-0.6.2-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl.

File metadata

File hashes

Hashes for good_common-0.6.2-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl
Algorithm Hash digest
SHA256 1f7330c7f7fd3ced76844e3ef922f155b0422b3fd80b87757e961818f4fc8948
MD5 f15d98ca3f5b603beb6ea45de21053d1
BLAKE2b-256 5e5f72e1d8a2e7ccc651a87bb69aa525862715347344a9de136489e8e8865157

See more details on using hashes here.

File details

Details for the file good_common-0.6.2-cp313-cp313-macosx_10_13_universal2.whl.

File metadata

File hashes

Hashes for good_common-0.6.2-cp313-cp313-macosx_10_13_universal2.whl
Algorithm Hash digest
SHA256 8f83603853938aadc8fa723676d3306601a463e3d2dec733d0499bb4368b6a75
MD5 9c2c498452fded79e2f90defb51ce1eb
BLAKE2b-256 f01715efda8432794b3bdacfde1c747cb1fd4f8ad602fd83bdb03e35ac127ce9

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page