Good Kiwi Common Library
Project description
good-common
A small set of common dependencies for Good Kiwi.
Dependency Provider
BaseProvider is a base class for creating fast_depends (so FastAPI and FastStream compatible) dependency providers.
class APIClient:
def __init__(self, api_key: str):
self.api_key = api_key
def get(self, url: str):
return f"GET {url} with {self.api_key}"
class APIClientProvider(BaseProvider[APIClient], APIClient):
pass
from fast_depends import inject
@inject
def some_task(
api_client: APIClient = APIClientProvider(api_key="1234"),
):
return api_client.get("https://example.com")
Can also be used without fast_depends:
client = APIClientProvider(api_key="1234").get()
Override initializer
to customize how the dependency class is initialized.
class APIClientProvider(BaseProvider[APIClient], APIClient):
def initializer(
self,
cls_args: typing.Tuple[typing.Any, ...], # args passed to the Provider
cls_kwargs: typing.Dict[str, typing.Any], # kwargs passed to the Provider
fn_kwargs: typing.Dict[str, typing.Any], # kwargs passed to the function at runtime
):
return cls_args, {**cls_kwargs, **fn_kwargs} # override the api_key with the one passed to the function
@inject
def some_task(
api_key: str,
api_client: APIClient = APIClientProvider(),
):
return api_client.get("https://example.com")
some_task(api_key="5678")
Pipeline
Overview
The Pipeline library provides a flexible and efficient way to create and execute pipelines of components in Python. It supports both synchronous and asynchronous execution, type checking, parallel processing, and error handling.
Features
- Create pipelines with multiple components that can accept multiple inputs and produce multiple outputs
- Typed "channels" for passing data between components
- Support for both synchronous and asynchronous components
- Type checking for inputs and outputs using Python type annotations
- Parallel execution of pipeline instances
- Error handling with Result types
- Function mapping for flexible component integration
Quick Start
from typing import Annotated
from good_common.pipeline import Pipeline, Attribute
def add(a: int, b: int) -> Annotated[int, Attribute("result")]:
return a + b
def multiply(result: int, factor: int) -> Annotated[int, Attribute("result")]:
return result * factor
# Create a pipeline
my_pipeline = Pipeline(add, multiply)
# Execute the pipeline
result = await my_pipeline(a=2, b=3, factor=4)
print(result.result) # Output: 20
Usage
Creating a Pipeline
Use the Pipeline
class to create a new pipeline:
from pipeline import Pipeline
my_pipeline = Pipeline(component1, component2, component3)
Defining Components
Components can be synchronous or asynchronous functions:
from typing import Annotated
from pipeline import Attribute
def sync_component(x: int) -> Annotated[int, Attribute("result")]:
return x + 1
async def async_component(x: int) -> Annotated[int, Attribute("result")]:
await asyncio.sleep(0.1)
return x * 2
Executing a Pipeline
Execute a pipeline asynchronously:
result = await my_pipeline(x=5)
print(result.result)
Parallel Execution
Execute a pipeline with multiple inputs in parallel:
inputs = [{"a": 1, "b": 2, "factor": 2}, {"a": 2, "b": 3, "factor": 3}]
results = [result async for result in my_pipeline.execute(*inputs, max_workers=3)]
for result in results:
if result.is_ok():
print(result.unwrap().result)
else:
print(f"Error: {result.unwrap_err()}")
Error Handling
The pipeline handles errors gracefully in parallel execution:
def faulty_component(x: int) -> Annotated[int, Attribute("result")]:
if x == 2:
raise ValueError("Error on purpose!")
return x + 1
pipeline = Pipeline(faulty_component)
inputs = [{"x": 1}, {"x": 2}, {"x": 3}]
results = [result async for result in pipeline.execute(*inputs)]
for result in results:
if result.is_ok():
print(result.unwrap().result)
else:
print(f"Error: {result.unwrap_err()}")
Function Mapping
Use function_mapper
to adjust input parameter names:
from pipeline import function_mapper
def multiply_diff(difference: int, factor: int) -> Annotated[int, Attribute("result")]:
return difference * factor
pipeline = Pipeline(subtract, function_mapper(multiply_diff, diff="difference"))
Advanced Features
- Mixed synchronous and asynchronous components in a single pipeline
- Custom output types with
Attribute
annotations - Flexible error handling in both single and parallel executions
Utilities
Various utility functions for common tasks.
Look at /tests/good_common/utilities
for usage
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file good_common-0.1.13.tar.gz
.
File metadata
- Download URL: good_common-0.1.13.tar.gz
- Upload date:
- Size: 51.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.12.5
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | eddca900dd9864264c146a9f1f377ad04dc414fd153c8e63cad887861c8d9557 |
|
MD5 | a5749e801a54243e3adf420ec5320535 |
|
BLAKE2b-256 | b13e8a3d409dabe8a33dd99d08ab244d06c7157c6be53df81e8cb4b1b3def3b3 |
File details
Details for the file good_common-0.1.13-py3-none-any.whl
.
File metadata
- Download URL: good_common-0.1.13-py3-none-any.whl
- Upload date:
- Size: 52.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.12.5
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | d8822a7461f4360a29b871c4fe195aea7d550a25988bf4fc7870ad6b83c6c539 |
|
MD5 | 3189b8ca9408962db76467d9f28b6fac |
|
BLAKE2b-256 | dac58a76dc75b91adb390a36316de501372900e8cafa34f3029c4072a2b99739 |