Skip to main content

Good Kiwi Common Library

Project description

good-common

A small set of common dependencies for Good Kiwi.

Dependency Provider

BaseProvider is a base class for creating fast_depends (so FastAPI and FastStream compatible) dependency providers.

class APIClient:
    def __init__(self, api_key: str):
        self.api_key = api_key

    def get(self, url: str):
        return f"GET {url} with {self.api_key}"

class APIClientProvider(BaseProvider[APIClient], APIClient):
    pass


from fast_depends import inject

@inject
def some_task(
    api_client: APIClient = APIClientProvider(api_key="1234"),
):
    return api_client.get("https://example.com")

Can also be used without fast_depends:

client = APIClientProvider(api_key="1234").get()

Override initializer to customize how the dependency class is initialized.

class APIClientProvider(BaseProvider[APIClient], APIClient):
    def initializer(
        self,
        cls_args: typing.Tuple[typing.Any, ...],  # args passed to the Provider
        cls_kwargs: typing.Dict[str, typing.Any],  # kwargs passed to the Provider
        fn_kwargs: typing.Dict[str, typing.Any],  # kwargs passed to the function at runtime
    ):
        return cls_args, {**cls_kwargs, **fn_kwargs}  # override the api_key with the one passed to the function


@inject
def some_task(
    api_key: str,
    api_client: APIClient = APIClientProvider(),
):
    return api_client.get("https://example.com")


some_task(api_key="5678")

Pipeline

Overview

The Pipeline library provides a flexible and efficient way to create and execute pipelines of components in Python. It supports both synchronous and asynchronous execution, type checking, parallel processing, and error handling.

Features

  • Create pipelines with multiple components that can accept multiple inputs and produce multiple outputs
  • Typed "channels" for passing data between components
  • Support for both synchronous and asynchronous components
  • Type checking for inputs and outputs using Python type annotations
  • Parallel execution of pipeline instances
  • Error handling with Result types
  • Function mapping for flexible component integration

Quick Start

from typing import Annotated
from good_common.pipeline import Pipeline, Attribute

def add(a: int, b: int) -> Annotated[int, Attribute("result")]:
    return a + b

def multiply(result: int, factor: int) -> Annotated[int, Attribute("result")]:
    return result * factor

# Create a pipeline
my_pipeline = Pipeline(add, multiply)

# Execute the pipeline
result = await my_pipeline(a=2, b=3, factor=4)
print(result.result)  # Output: 20

Usage

Creating a Pipeline

Use the Pipeline class to create a new pipeline:

from pipeline import Pipeline

my_pipeline = Pipeline(component1, component2, component3)

Defining Components

Components can be synchronous or asynchronous functions:

from typing import Annotated
from pipeline import Attribute

def sync_component(x: int) -> Annotated[int, Attribute("result")]:
    return x + 1

async def async_component(x: int) -> Annotated[int, Attribute("result")]:
    await asyncio.sleep(0.1)
    return x * 2

Executing a Pipeline

Execute a pipeline asynchronously:

result = await my_pipeline(x=5)
print(result.result)

Parallel Execution

Execute a pipeline with multiple inputs in parallel:

inputs = [{"a": 1, "b": 2, "factor": 2}, {"a": 2, "b": 3, "factor": 3}]
results = [result async for result in my_pipeline.execute(*inputs, max_workers=3)]

for result in results:
    if result.is_ok():
        print(result.unwrap().result)
    else:
        print(f"Error: {result.unwrap_err()}")

Error Handling

The pipeline handles errors gracefully in parallel execution:

def faulty_component(x: int) -> Annotated[int, Attribute("result")]:
    if x == 2:
        raise ValueError("Error on purpose!")
    return x + 1

pipeline = Pipeline(faulty_component)
inputs = [{"x": 1}, {"x": 2}, {"x": 3}]
results = [result async for result in pipeline.execute(*inputs)]

for result in results:
    if result.is_ok():
        print(result.unwrap().result)
    else:
        print(f"Error: {result.unwrap_err()}")

Function Mapping

Use function_mapper to adjust input parameter names:

from pipeline import function_mapper

def multiply_diff(difference: int, factor: int) -> Annotated[int, Attribute("result")]:
    return difference * factor

pipeline = Pipeline(subtract, function_mapper(multiply_diff, diff="difference"))

Advanced Features

  • Mixed synchronous and asynchronous components in a single pipeline
  • Custom output types with Attribute annotations
  • Flexible error handling in both single and parallel executions

Utilities

Various utility functions for common tasks.

Look at /tests/good_common/utilities for usage

Project details


Release history Release notifications | RSS feed

This version

0.4.0

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

good_common-0.4.0-cp313-cp313-win_amd64.whl (502.2 kB view details)

Uploaded CPython 3.13Windows x86-64

good_common-0.4.0-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl (1.3 MB view details)

Uploaded CPython 3.13manylinux: glibc 2.17+ x86-64manylinux: glibc 2.28+ x86-64

good_common-0.4.0-cp313-cp313-macosx_11_0_arm64.whl (516.4 kB view details)

Uploaded CPython 3.13macOS 11.0+ ARM64

good_common-0.4.0-cp313-cp313-macosx_10_13_x86_64.whl (519.3 kB view details)

Uploaded CPython 3.13macOS 10.13+ x86-64

good_common-0.4.0-cp313-cp313-macosx_10_13_universal2.whl (658.3 kB view details)

Uploaded CPython 3.13macOS 10.13+ universal2 (ARM64, x86-64)

File details

Details for the file good_common-0.4.0-cp313-cp313-win_amd64.whl.

File metadata

File hashes

Hashes for good_common-0.4.0-cp313-cp313-win_amd64.whl
Algorithm Hash digest
SHA256 c67f1305c27182d77e7a4d09767e93416b73c52a4455ce9682455d0e518b694f
MD5 1518709f92d887b6687b054c7a1788ed
BLAKE2b-256 5bcefe91517ef09eb9ec65f3ee1d5bec8dce0a09052d149f6dda9b44c427c288

See more details on using hashes here.

File details

Details for the file good_common-0.4.0-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl.

File metadata

File hashes

Hashes for good_common-0.4.0-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl
Algorithm Hash digest
SHA256 3291bdc9c56339aa00ff30585af0f58dc6944508dac81466fee3c1ea6165ef56
MD5 b10522c0f2edd306da1c0c5304ea3960
BLAKE2b-256 b3a70698fc463264bf313a8820f3f64a8c64fa2398bdda0d3b41fb6b06e66347

See more details on using hashes here.

File details

Details for the file good_common-0.4.0-cp313-cp313-macosx_11_0_arm64.whl.

File metadata

File hashes

Hashes for good_common-0.4.0-cp313-cp313-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 016b0dc81921c049da7d3f7681804802485f51b586884b0c689ec5cb6594dbfb
MD5 155857664206fe4e5f94b0f08bc27137
BLAKE2b-256 96e6085b06503e6f2f596b62f95fc41122986d80b6f298a3fab3d5a8805fee71

See more details on using hashes here.

File details

Details for the file good_common-0.4.0-cp313-cp313-macosx_10_13_x86_64.whl.

File metadata

File hashes

Hashes for good_common-0.4.0-cp313-cp313-macosx_10_13_x86_64.whl
Algorithm Hash digest
SHA256 06c7ca8d074681ed8a5b27c1be9fed45336a8bc90735295ba3433a31d1cbd425
MD5 d9741a2a75aa35a46048cccc9c4e6174
BLAKE2b-256 c4bf3bb3368a619a75f31044cd1c102014f33c74bffb9332bd44d5b80503f26a

See more details on using hashes here.

File details

Details for the file good_common-0.4.0-cp313-cp313-macosx_10_13_universal2.whl.

File metadata

File hashes

Hashes for good_common-0.4.0-cp313-cp313-macosx_10_13_universal2.whl
Algorithm Hash digest
SHA256 7c1917f539501d0058891c46d0f1d8cf6895431385d50eb51867c878e443d65f
MD5 2b8c0c9ec12259d409f5d4c4b5582c1a
BLAKE2b-256 172c11a784df3fa509d4bae4bf63245eb9ead186abee914d293d98161e85356d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page