An set of useful extensions for the Haystack AI library.
Project description
Haystack 2.x Custom Extensions
Welcome to the repository for custom extensions of Haystack, version 2.0 and onwards. This collection primarily aims at enhancing concurrent components to optimize I/O-bound tasks. Furthermore, it addresses the concept of subpipelines, offering comprehensive solutions within this space.
Quick Start Guide
To get started with developing or creating new integrations, you will need to use hatch. Please visit this link and follow the installation instructions tailored to your operating system and platform.
The integrations within this repository are designed to be self-contained. Thus, the initial step in working on an integration involves navigating (cd) into the corresponding folder. For instance, if you wish to execute the test suite for the Chroma document store, simply run the following command from the root of the repository:
$ hatch run test
Hatch will take care of setting up an isolated Python environment and run the tests.
Installation
Run:
$ pip install haystack-extensions
Usage
Concurrent Components
Below a code snipped demonstrating how to run components multithreaded in another component:
import time
from haystack import component
from haystack_extensions.components.concurrent_runner.runner import (
ConcurrentComponentRunner,
NamedComponent
)
@component
class SimplePrintStringWithWaitComponent:
"""
A component that prints a string and waits for a given number of seconds
"""
def __init__(self, wait_time: int = 4):
self.wait_time = wait_time
@component.output_types(text=str)
def run(self, text: str) -> str:
time.sleep(self.wait_time)
print(text, " <<== waited for", self.wait_time, "seconds")
return {"text": text}
if __name__ == "__main__":
comp1 = SimplePrintStringWithWaitComponent(wait_time=10)
comp2 = SimplePrintStringWithWaitComponent(wait_time=3)
comp3 = SimplePrintStringWithWaitComponent(wait_time=5)
named_components = [NamedComponent("one", comp1), NamedComponent("two", comp2), NamedComponent("three", comp3)]
concurrent_component_runner = ConcurrentComponentRunner(named_components)
p = Pipeline()
p.add_component("concurrent_component_runner", concurrent_component_runner)
result = p.run(data={ "concurrent_component_runner": {
"one_text": "Hello",
"two_text": "World",
"three_text": "!"
}
})
print(result)
This will lead to the following output and result:
Console output:
World <<== waited for 3 seconds
! <<== waited for 5 seconds
Hello <<== waited for 10 seconds
Result:
result = {'multithreaded_component': {'one_text': 'Hello', 'two_text': 'World', 'three_text': '!'}}
Subpipelines
import time
from haystack import component, Pipeline
from haystack_extensions.components.concurrent_runner.runner import (
NamedPipeline,
ConcurrentPipelineRunner
)
@component
class SimplePrintStringWithWaitComponent:
"""
A component that prints a string and waits for a given number of seconds
"""
def __init__(self, wait_time: int = 4):
self.wait_time = wait_time
@component.output_types(text=str)
def run(self, text: str) -> str:
time.sleep(self.wait_time)
print(text, " <<== waited for", self.wait_time, "seconds")
return {"text": text}
if __name__ == "__main__":
comp1 = SimplePrintStringWithWaitComponent(wait_time=10)
comp2 = SimplePrintStringWithWaitComponent(wait_time=3)
pipeline1 = Pipeline()
pipeline1.add_component("simple_component", comp1)
pipeline2 = Pipeline()
pipeline2.add_component("simple_component", comp2)
concurrent_pipeline_runner = ConcurrentPipelineRunner([NamedPipeline("pipeline1", pipeline1), NamedPipeline("pipeline2", pipeline2)])
overall_pipeline = Pipeline()
overall_pipeline.add_component("concurrent_pipeline_runner", concurrent_pipeline_runner)
results = overall_pipeline.run(
data={
"concurrent_pipeline_runner": {
"pipeline1": {"simple_component": {"text": "Hello"}},
"pipeline2": {"simple_component": {"text": "World"}},
}
}
)
print(results)
This will lead to the following output and result:
Console output:
World <<== waited for 3 seconds
Hello <<== waited for 10 seconds
Result:
result = {'concurrent_pipeline_runner': {'pipeline1': {'simple_component': {'text': 'Hello'}}, 'pipeline2': {'simple_component': {'text': 'World'}}}}
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file haystack_extensions-0.0.4.tar.gz.
File metadata
- Download URL: haystack_extensions-0.0.4.tar.gz
- Upload date:
- Size: 11.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: python-httpx/0.27.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5beca6d59c64fbcad4c6473593ddfeafe84818651795810616a11ca34c6bd7da
|
|
| MD5 |
b99600190a162b05352fe4505635e1d9
|
|
| BLAKE2b-256 |
0b7ae68e1cf60268fda6db613d8b62f87d4167bdd76397f7423aae5780763636
|
File details
Details for the file haystack_extensions-0.0.4-py3-none-any.whl.
File metadata
- Download URL: haystack_extensions-0.0.4-py3-none-any.whl
- Upload date:
- Size: 8.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: python-httpx/0.27.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fe03ebf20426d29ac823c4dad657e3e6b0cf9f5cd7c2cf423a996985e414503d
|
|
| MD5 |
998c385e876c2131dfaabe2ec0e0f934
|
|
| BLAKE2b-256 |
c0631e03e446c0ade4d500b73bf9d64c47f88c0976a913bb412994922fe61ed7
|