Skip to main content

An set of useful extensions for the Haystack AI library.

Project description

Haystack 2.x Custom Extensions

Welcome to the repository for custom extensions of Haystack, version 2.0 and onwards. This collection primarily aims at enhancing concurrent components to optimize I/O-bound tasks. Furthermore, it addresses the concept of subpipelines, offering comprehensive solutions within this space.

Quick Start Guide

To get started with developing or creating new integrations, you will need to use hatch. Please visit this link and follow the installation instructions tailored to your operating system and platform.

The integrations within this repository are designed to be self-contained. Thus, the initial step in working on an integration involves navigating (cd) into the corresponding folder. For instance, if you wish to execute the test suite for the Chroma document store, simply run the following command from the root of the repository:

$ hatch run test

Hatch will take care of setting up an isolated Python environment and run the tests.

Installation

Run:

$ pip install haystack-extensions

Usage

Concurrent Components

Below a code snipped demonstrating how to run components multithreaded in another component:

import time
from haystack import component

from haystack_extensions.components.concurrent_runner.runner import (
    ConcurrentComponentRunner,
    NamedComponent
)


@component
class SimplePrintStringWithWaitComponent:
    """
    A component that prints a string and waits for a given number of seconds
    """

    def __init__(self, wait_time: int = 4):
        self.wait_time = wait_time

    @component.output_types(text=str)
    def run(self, text: str) -> str:
        time.sleep(self.wait_time)
        print(text, " <<== waited for", self.wait_time, "seconds")
        return {"text": text}


if __name__ == "__main__":
    comp1 = SimplePrintStringWithWaitComponent(wait_time=10)
    comp2 = SimplePrintStringWithWaitComponent(wait_time=3)
    comp3 = SimplePrintStringWithWaitComponent(wait_time=5)

    named_components = [NamedComponent("one", comp1), NamedComponent("two", comp2), NamedComponent("three", comp3)]
    concurrent_component_runner = ConcurrentComponentRunner(named_components)

    p = Pipeline()
    p.add_component("concurrent_component_runner", concurrent_component_runner)

    result = p.run(data={ "concurrent_component_runner": {
                    "one_text": "Hello",
                    "two_text": "World",
                    "three_text": "!"
                }
            })

    print(result)

This will lead to the following output and result:

Console output:

World  <<== waited for 3 seconds
!  <<== waited for 5 seconds
Hello  <<== waited for 10 seconds

Result:

result = {'multithreaded_component': {'one_text': 'Hello', 'two_text': 'World', 'three_text': '!'}}

Subpipelines

import time
from haystack import component, Pipeline

from haystack_extensions.components.concurrent_runner.runner import (
    NamedPipeline,
    ConcurrentPipelineRunner
)


@component
class SimplePrintStringWithWaitComponent:
    """
    A component that prints a string and waits for a given number of seconds
    """

    def __init__(self, wait_time: int = 4):
        self.wait_time = wait_time

    @component.output_types(text=str)
    def run(self, text: str) -> str:
        time.sleep(self.wait_time)
        print(text, " <<== waited for", self.wait_time, "seconds")
        return {"text": text}

if __name__ == "__main__":
    comp1 = SimplePrintStringWithWaitComponent(wait_time=10)
    comp2 = SimplePrintStringWithWaitComponent(wait_time=3)

    pipeline1 = Pipeline()
    pipeline1.add_component("simple_component", comp1)

    pipeline2 = Pipeline()
    pipeline2.add_component("simple_component", comp2)

    concurrent_pipeline_runner = ConcurrentPipelineRunner([NamedPipeline("pipeline1", pipeline1), NamedPipeline("pipeline2", pipeline2)])

    overall_pipeline = Pipeline()

    overall_pipeline.add_component("concurrent_pipeline_runner", concurrent_pipeline_runner)

    results = overall_pipeline.run(
        data={
            "concurrent_pipeline_runner": {
                "pipeline1": {"simple_component": {"text": "Hello"}},
                "pipeline2": {"simple_component": {"text": "World"}},
            }
        }
    )

    print(results)

This will lead to the following output and result:

Console output:

World  <<== waited for 3 seconds
Hello  <<== waited for 10 seconds

Result:

result = {'concurrent_pipeline_runner': {'pipeline1': {'simple_component': {'text': 'Hello'}}, 'pipeline2': {'simple_component': {'text': 'World'}}}}

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

haystack_extensions-0.0.4.tar.gz (11.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

haystack_extensions-0.0.4-py3-none-any.whl (8.9 kB view details)

Uploaded Python 3

File details

Details for the file haystack_extensions-0.0.4.tar.gz.

File metadata

  • Download URL: haystack_extensions-0.0.4.tar.gz
  • Upload date:
  • Size: 11.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: python-httpx/0.27.0

File hashes

Hashes for haystack_extensions-0.0.4.tar.gz
Algorithm Hash digest
SHA256 5beca6d59c64fbcad4c6473593ddfeafe84818651795810616a11ca34c6bd7da
MD5 b99600190a162b05352fe4505635e1d9
BLAKE2b-256 0b7ae68e1cf60268fda6db613d8b62f87d4167bdd76397f7423aae5780763636

See more details on using hashes here.

File details

Details for the file haystack_extensions-0.0.4-py3-none-any.whl.

File metadata

File hashes

Hashes for haystack_extensions-0.0.4-py3-none-any.whl
Algorithm Hash digest
SHA256 fe03ebf20426d29ac823c4dad657e3e6b0cf9f5cd7c2cf423a996985e414503d
MD5 998c385e876c2131dfaabe2ec0e0f934
BLAKE2b-256 c0631e03e446c0ade4d500b73bf9d64c47f88c0976a913bb412994922fe61ed7

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page