Skip to main content

CLI tool to manage the lifecycle of a RAG

Project description

FastRAG CLI

PyPI

Installation

pip install fastrag-cli
# or
uv add fastrag-cli

General Usage

Generally to use the CLI you will need a configuration file. The default plugins provide a yaml configuration reader, but it can be of any format, as long as you provide an IConfigLoader that can handle it.

To run your own configuration workflow config.yaml with verbosity.

fastrag run -v config.yaml

Delete the cached files after all these executions (with prompt)

fastrag clean config.yaml

Delete the cached files (without prompt)

fastrag clean -y config.yaml

To serve the inference endpoints

fastrag serve config.yaml

Documentation

To generate the automatic documentation

typer ./fastrag/__main__.py utils docs > USAGE.md

Architecture

The main benefit of using plugins is being able to expand the workflow execution capabilities, which requires to understand how it works, as of now, the core components forming FastRAG are:

  • ICache handles the caching capabilities of the workflow.
    • Implementation provided for LocalCache (supported="local").
  • IConfigLoader provides a loading method to transform the given config file into a configuration object.
    • Implementation provided for YamlLoader (supported=[".yaml", ".yml"]) (will decide based on configuration file extension).
  • IRunner orchestrates the steps in the configuration object.
    • Implementation provided for AsyncRunner (supported="async").
  • IStep defines the tasks to be performed in every step, handles the communication of data between steps.
    • Implementation provided for Fetching (supported="fetching"), Parsing (supported="parsing"), Chunking (supported="chunking"), Embedding (supported="embedding") and Benchmarking (supported="benchmarking").
  • Task executes the declared task.
    • Implementation provided for:
      • Fetching:
        • HttpFetcher (supported=["URL"])
        • LocalFetcher (supported=["Path"])
        • SitemapXMLFetcher (supported=["SitemapXML"])
        • CrawlerFetcher (supported=["Crawling"])
      • Parsing:
        • HtmlParser (supported=["HtmlParser"])
        • FileParser (supported=["FileParser"])
      • Chunking:
        • ParentChildChunker (supported=["ParentChild"])
        • RecursiveChunker (supported=["RecursiveChunker"])
        • SlidingWindowChunker (supported=["SlidingWindow"])
      • Embedding:
        • OpenAISimple (supported=["OpenAI-Simple", "openai-simple"])
      • Benchmarking:
        • ChunkQualityBenchmarking (supported=["ChunkQuality"])
        • QuerySetBenchmarking (supported=["QuerySet"])

Providing a new implementation for any of these components is as easy as inheriting from them and executing fastRAG with the plugin base dir as:

fastrag run config.yaml --plugins <IMPLEMENTATION_DIR> -v

Implementing Tasks

The most generic components are Tasks, since they do from fetching from a URL, to parsing HTML to Markdown, to creating embeddings; hence we will make an example of Task implementation:

@dataclass(frozen=True)
class HttpFetcher(Task):
    supported: ClassVar[str] = "URL"

    url: URLField = URLField()
    _cached: bool = field(init=False, default=False, hash=False, compare=False)

    @override
    async def run(self) -> Run:
        if self.cache.is_present(self.url):
            object.__setattr__(self, "_cached", True)
            return

        try:
            async with AsyncClient(timeout=10) as client:
                res = await client.get(self.url)
        except Exception as e:
            yield Event(Event.Type.EXCEPTION, f"ERROR: {e}")
            return

        entry = await self.cache.create(
            self.url,
            res.text.encode(),
            {
                "step": "fetching",
                "format": "html",
                "strategy": HttpFetcher.supported,
            },
        )

        self.result = entry.path

    @override
    def completed_callback(self) -> Event:
        return Event(
            Event.Type.COMPLETED,
            f"{'Cached' if self._cached else 'Fetched'} {self.url}",
        )

Plugin Architecture

Here we can see a few things, first of all, we have our class which inherits from Task which will register the implementation, to do so it's also needed to specify a supported attribute.

class HttpFetcher(Task):
    # It supports using it as URL
    supported: str = "URL"

class HttpFetcher(Task):
    # It supports using both URL or HTTP
    supported: str = ["URL", "HTTP"]

@dataclass(frozen=True)
class HttpFetcher(Task):
    # The same but with dataclasses
    supported: ClassVar[str] = "URL"

This supported attribute is the one that must match the configuration step strategy and will be used when deciding which implementation to use.

# config.yaml
steps:
  fetching:
    - strategy: URL # Must match this
    - strategy: HTTP # or this

Initialization and Arguments

Although it depends of the Step implementation, generally when defining the tasks to perform, upon injecting the needed implementations, the class will be provided with its constructor parameters from the given configuration object, in this case, it will pass the url argument.

# config.yaml
steps:
  fetching:
      - strategy: URL
          params:
            url: https://agrospai.udl.cat
      - strategy: HTTP
          params:
            url: https://agrospai.udl.cat

Task Methods

As of the run method, which is inherited from Task, it's the one supposed to do the heavy-lifting. There are two cases, the shown, which is the simpler, where it does not recieve any parameters, and another on which will be discussed later.

@override
async def run(self) -> AsyncGenerator[Event, None]:
    ...

As shown in the type hinting, the method is expected to yield events, we provide an event base class and some subclasses for each workflow step. These events are nothing but feedback to show in the terminal (behaviour defined in Step). In this shown example, we only show feedback upon failure.

async def run(self) -> AsyncGenerator[Event, None]:
    ...
    except Exception as e:
        yield Event(Event.Type.EXCEPTION, f"ERROR: {e}")
        return

Once the main purpose of this Task is finished, we must also define a completed_callback method which, instead of yielding, returns a feedback event.

@override
def completed_callback(self) -> Event:
    return Event(
        Event.Type.COMPLETED,
        f"{'Cached' if self._cached else 'Fetched'} {self.url}",
    )

Task Communication

Apart from executing as supposed and giving feedback, a Task is expected to communicate with other steps, otherwise it wouldn't be a workflow. To do so, it should make use of the cache, as in the shown example:

Firstly, for skipping the execution if the expected result is already cached to save time and resources. Secondly, to cache the results to use in the next steps.

async def run(self) -> AsyncGenerator[Event, None]:
    if self.cache.is_present(self.url):
        object.__setattr__(self, "_cached", True)
        return
async def run(self) -> AsyncGenerator[Event, None]:
    ...
    await self.cache.create(
        self.url,
        res.text.encode(),
        "fetching",
        {"format": "html", "strategy": HttpFetcher.supported},
    )

The cache main methods are those two:

  • is_present: Check for cache entry existence given a URI. Also checks for lifetime validity in case of LocalCache.
  • create: Creates a new cache entry given its uri (in this case the url), contents, step (in this case fetching) and metadata (arbitrary data). Besides the given data, the entries will also contain a timestamp and path.

Now that we have covered how to make a simple Task for http retrieving, we will cover how to make other kind of tasks that depend on previous results (cache entries). As commented earlier, there are two ways of using callback, the simpler way, without any arguments, and the following.

We will present you with another Task example, this time the HtmlParser. In this case, the purpose of this task is to transform HTML files into Markdown, since LLMs like Markdown better. To do this task, we need to access the previous fetching step results, since this is a common occurence in multiple steps, it has been abstracted away.

@dataclass(frozen=True)
class HtmlParser(Task):
    supported: ClassVar[str] = "HtmlParser"
    filter: ClassVar[Filter] = MetadataFilter(step="fetching", format="html")

    @override
    async def run(
        self,
        uri: str,
        entry: CacheEntry,
    ) -> AsyncGenerator[Event, None]:
        ...

As we can see in the previous code, there are two main differences with the HttpFetcher. Firstly, the definition of the filter attribute, and lastly, the callback method signature.

@override
async def run(
    self,
    uri: str,
    entry: CacheEntry,
) -> AsyncGenerator[Event, None]:
    ...

This method signature implies that the data (cache entry) is being passed to the function call and that the task implementation doesn't have to worry about it. To declare which subset of Cache Entries the task instance will process, a filter class attribute is defined. This filter is an instance of Filter[CacheEntry], which will be used to filter out the cache entries that are fitted to use in this task. The responsable of doing this labor is the Step implementation, which will gather all the relevant cache entries.

# Rewrote for non-dataclasses
filter: Filter = MetadataFilter(step="fetching", format="html")

This filter uses a special subclass of Filter, the MetadataFilter, which only accepts the cache entries that have the given kwargs in their metadata. The operator & joins both filters in an AndFilter, which is a basic filter that ensures all its sub-filters accept the given entry for it to accept it, in case it's needed, there is also an OrFilter with the operator |.

NOTE that for every entry compliant with the given filter, with the default Step implementations, an asyncio.Task will be created and waited for.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fastrag_cli-0.1.6.tar.gz (1.2 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fastrag_cli-0.1.6-py3-none-any.whl (66.6 kB view details)

Uploaded Python 3

File details

Details for the file fastrag_cli-0.1.6.tar.gz.

File metadata

  • Download URL: fastrag_cli-0.1.6.tar.gz
  • Upload date:
  • Size: 1.2 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.0.1 CPython/3.12.8

File hashes

Hashes for fastrag_cli-0.1.6.tar.gz
Algorithm Hash digest
SHA256 a2a97ecf4cbc9ff359746b5452e168db2dbc3cc5badeb84fb7186005190a9ee0
MD5 47d93b2274e2f3586c7e338caa82f98f
BLAKE2b-256 6094884130032ef4dc5ffc68e2f9e55b67c5a0a358e463b3c9724a94f61a48f2

See more details on using hashes here.

File details

Details for the file fastrag_cli-0.1.6-py3-none-any.whl.

File metadata

  • Download URL: fastrag_cli-0.1.6-py3-none-any.whl
  • Upload date:
  • Size: 66.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.0.1 CPython/3.12.8

File hashes

Hashes for fastrag_cli-0.1.6-py3-none-any.whl
Algorithm Hash digest
SHA256 b5cdf149186dd8a48ddadaa6d75edd5bbee61259a5b14739047b6accf4b593b0
MD5 92976e57511371516885b2eec7f267d5
BLAKE2b-256 d4d51d680a6c5a6bfb39148b831ec7737c2e4c3ba3333e09b0fea2d6633577b1

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page