CLI tool to manage the lifecycle of a RAG
Project description
FastRAG CLI
Installation
pip install fastrag-cli
# or
uv add fastrag-cli
General Usage
Generally to use the CLI you will need a configuration file. The default plugins provide a yaml configuration reader, but it can be of any format, as long as you provide an IConfigLoader that can handle it.
To run your own configuration workflow config.yaml with verbosity.
fastrag run -v config.yaml
Delete the cached files after all these executions (with prompt)
fastrag clean config.yaml
Delete the cached files (without prompt)
fastrag clean -y config.yaml
To serve the inference endpoints
fastrag serve config.yaml
Documentation
To generate the automatic documentation
typer ./fastrag/__main__.py utils docs > USAGE.md
General Workflow
Architecture
The main benefit of using plugins is being able to expand the workflow execution capabilities, which requires to understand how it works, as of now, the core components forming FastRAG are:
- ICache handles the caching capabilities of the workflow.
- Implementation provided for
LocalCache (supported="local").
- Implementation provided for
- IConfigLoader provides a loading method to transform the given config file into a configuration object.
- Implementation provided for
YamlLoader (supported=[".yaml", ".yml"])(will decide based on configuration file extension).
- Implementation provided for
- IRunner orchestrates the steps in the configuration object.
- Implementation provided for
AsyncRunner (supported="async").
- Implementation provided for
- IStep defines the tasks to be performed in every step, handles the communication of data between steps.
- Implementation provided for
Fetching (supported="fetching"),Parsing (supported="parsing"),Chunking (supported="chunking"),Embedding (supported="embedding")andBenchmarking (supported="benchmarking").
- Implementation provided for
- Task executes the declared task.
- Implementation provided for:
Fetching:HttpFetcher (supported=["URL"])LocalFetcher (supported=["Path"])SitemapXMLFetcher (supported=["SitemapXML"])CrawlerFetcher (supported=["Crawling"])
Parsing:HtmlParser (supported=["HtmlParser"])FileParser (supported=["FileParser"])
Chunking:ParentChildChunker (supported=["ParentChild"])RecursiveChunker (supported=["RecursiveChunker"])SlidingWindowChunker (supported=["SlidingWindow"])
Embedding:OpenAISimple (supported=["OpenAI-Simple", "openai-simple"])
Benchmarking:ChunkQualityBenchmarking (supported=["ChunkQuality"])QuerySetBenchmarking (supported=["QuerySet"])
- Implementation provided for:
Providing a new implementation for any of these components is as easy as inheriting from them and executing fastRAG with the plugin base dir as:
fastrag run config.yaml --plugins <IMPLEMENTATION_DIR> -v
Implementing Tasks
The most generic components are Tasks, since they do from fetching from a URL, to parsing HTML to Markdown, to creating embeddings; hence we will make an example of Task implementation:
@dataclass(frozen=True)
class HttpFetcher(Task):
supported: ClassVar[str] = "URL"
url: URLField = URLField()
_cached: bool = field(init=False, default=False, hash=False, compare=False)
@override
async def run(self) -> Run:
if self.cache.is_present(self.url):
object.__setattr__(self, "_cached", True)
return
try:
async with AsyncClient(timeout=10) as client:
res = await client.get(self.url)
except Exception as e:
yield Event(Event.Type.EXCEPTION, f"ERROR: {e}")
return
entry = await self.cache.create(
self.url,
res.text.encode(),
{
"step": "fetching",
"format": "html",
"strategy": HttpFetcher.supported,
},
)
self.result = entry.path
@override
def completed_callback(self) -> Event:
return Event(
Event.Type.COMPLETED,
f"{'Cached' if self._cached else 'Fetched'} {self.url}",
)
Plugin Architecture
Here we can see a few things, first of all, we have our class which inherits from Task which will register the implementation, to do so it's also needed to specify a supported attribute.
class HttpFetcher(Task):
# It supports using it as URL
supported: str = "URL"
class HttpFetcher(Task):
# It supports using both URL or HTTP
supported: str = ["URL", "HTTP"]
@dataclass(frozen=True)
class HttpFetcher(Task):
# The same but with dataclasses
supported: ClassVar[str] = "URL"
This supported attribute is the one that must match the configuration step strategy and will be used when deciding which implementation to use.
# config.yaml
steps:
fetching:
- strategy: URL # Must match this
- strategy: HTTP # or this
Initialization and Arguments
Although it depends of the Step implementation, generally when defining the tasks to perform, upon injecting the needed implementations, the class will be provided with its constructor parameters from the given configuration object, in this case, it will pass the url argument.
# config.yaml
steps:
fetching:
- strategy: URL
params:
url: https://agrospai.udl.cat
- strategy: HTTP
params:
url: https://agrospai.udl.cat
Task Methods
As of the run method, which is inherited from Task, it's the one supposed to do the heavy-lifting. There are two cases, the shown, which is the simpler, where it does not recieve any parameters, and another on which will be discussed later.
@override
async def run(self) -> Run:
...
Although a TypeAlias is used, the run method is supposed to be of type Run, which is an AsyncGenerator[Event, None], thus the method is expected to yield events. These events are nothing but feedback to show in the terminal (behaviour defined in IStep). In this given example, we only show feedback upon failure.
async def run(self) -> Run:
...
except Exception as e:
yield Event(Event.Type.EXCEPTION, f"ERROR: {e}")
return
Once the main purpose of this Task is finished, we must also define a completed_callback method which, instead of yielding, it returns an Event.
@override
def completed_callback(self) -> Event:
return Event(
Event.Type.COMPLETED,
f"{'Cached' if self._cached else 'Fetched'} {self.url}",
)
Task Communication
Apart from executing as supposed and giving feedback, a Task is expected to communicate with other steps, otherwise it wouldn't be a workflow. To do so, it should make use of the cache, as in the shown example:
Firstly, for skipping the execution if the expected result is already cached to save time and resources. Secondly, to cache the results to use in the next steps (and runs).
async def run(self) -> Run:
if self.cache.is_present(self.url):
object.__setattr__(self, "_cached", True)
return
async def run(self) -> Run:
...
await self.cache.create(
self.url,
res.text.encode(),
"fetching",
{"format": "html", "strategy": HttpFetcher.supported},
)
The cache main methods are the following:
is_present: Check for cache entry existence given a URI. Also checks for lifetime validity.create: Creates a new cache entry given its URI (in this case the URL), contents, (in this case fetching) and metadata (arbitrary data). Besides the given data, the entries will also contain a timestamp (and path in the case ofLocalCacheimplementation).
Now that we have covered how to make a simple Task for http retrieving, we will cover how to make other kind of tasks that depend on previous results (cache entries). As commented earlier, there are two ways of using run, the simpler way, without any arguments, and the following.
We will present you with another Task example, this time the HtmlParser. In this case, the purpose of this task is to transform HTML files into Markdown, since LLMs like Markdown better. To do this task, we need to access the previous fetching step results, since this is a common occurence in multiple steps, it has been abstracted away.
@dataclass(frozen=True)
class HtmlParser(Task):
supported: ClassVar[str] = "HtmlParser"
filter: ClassVar[Filter] = MetadataFilter(step="fetching", format="html")
@override
async def run(
self,
uri: str,
entry: CacheEntry,
) -> Run:
...
As we can see in the previous code, there are two main differences with the HttpFetcher. Firstly, the definition of the filter attribute, and lastly, the run method signature.
@override
async def run(
self,
uri: str,
entry: CacheEntry,
) -> Run:
...
This method signature implies that the data (cache entry) is being passed to the function call and that the task implementation doesn't have to worry about it. To declare which subset of Cache Entries the task instance will process, a filter class attribute is defined. This filter is an instance of Filter[CacheEntry], which will be used to filter out the cache entries that are fitted to use in this task. The responsable of doing this labor is the Step implementation, which will gather all the relevant cache entries.
# Rewrote for non-dataclasses
filter: Filter = MetadataFilter(step="fetching", format="html")
This filter uses a special subclass of Filter, the MetadataFilter, which only accepts the cache entries that have the given kwargs in their metadata. But we could append filters using the operator &, which joins both filters in an AndFilter, a basic filter that ensures all its sub-filters accept the given entry for it to accept it, in case it's needed, or the operator | which results in a OrFilter.
NOTE that for every entry compliant with the given filter, with the default
Stepimplementations, anasyncio.Taskwill be created and awaited for.
Experiments
Experiment definition
Now that we have defined the sources of our experiments, we are yet to define the experiment steps. The experiment step, as in the following configuration file, contains the different steps that we want to test our workflow with, every step may have multiple different strategies as in the following example.
# config.yaml
experiments:
strategy: async_experiments
params:
max_concurrent: 3
steps:
chunking:
- strategy: ParentChild
params:
embedding_model: "paraphrase-multilingual:latest"
embedding_api_url: "https://chat.agrospai.udl.cat/ollama/api/embed"
embedding_api_key: ${CHAT_API_KEY}
- strategy: SlidingWindow
params:
chunk_size: 1200
chunk_overlap: 200
embedding:
- strategy: OpenAI-Simple
params:
model: nomic-embed-text-v2-moe:latest
api_key: ${CHAT_API_KEY}
url: https://chat.agrospai.udl.cat/ollama/api/embed
benchmarking:
- strategy: QuerySet
params:
questions:
- [
"Who is the staff of the agrospai project?",
"Roberto Garcia Gonzalez"
]
- [
"What technologies does AgrospAI use to make payments",
"Smart Contracts", "Blockchain"
]
- strategy: ChunkQuality
As you can see, we have to define a strategy for running the different steps, in this case async_experiments, and its different configurable parameters, in this case max_concurrent, and the proper experiment steps, in this case chunking, embedding and benchmarking (a special case).
The given async_experiments will get the different steps and do their permutations, having, in the end, all possible variants of the substeps. In this case, it will generate the following experiments:
- Experiment #1: ParentChild chunking + OpenAI-Simple + All benchmarks
- Experiment #2: SlidingWindow chunking + OpenAI-Simple + All benchmarks
The benchmarking sub-step, in the async_experiments is ALWAYS expected and will be included in all experiments.
Experiment execution
Now that all the experiments have been defined, they will be executed concurrently (but synchronously inside the experiment), providing feedback to the user's terminal.
To implement an experiment step such as chunking or embedding, it's the same as the previous shown Task, but having in mind to add a special key value pair in the cache entry metadata with the current experiment hash, so the next experiment step uses the correct entries (to not using chunks from other experiments).
# parent_child.py
# Example of chunking storing results
existed, entries = await self.cache.get_or_create(
...
metadata={
"step": "chunking",
"strategy": ParentChildChunker.supported,
"experiment": self.experiment.experiment_hash, # THIS
},
)
Which will be used in the following EmbeddingStep task definition as:
@override
async def get_tasks(self) -> Tasks:
for task in self._tasks:
# `self.filter` is the current experiment filter
entries = await self.cache.get_entries(self.filter & task.filter)
yield (task, [task.run(uri, entry) for uri, entry in entries])
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file fastrag_cli-0.1.8.tar.gz.
File metadata
- Download URL: fastrag_cli-0.1.8.tar.gz
- Upload date:
- Size: 6.3 MB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.0.1 CPython/3.12.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
cfedd14d6ec4cca49a63ac99545279d4d28f0e59bd432ece70ed77e51c70a664
|
|
| MD5 |
0cab48d1dd893d4775595689cbc4152d
|
|
| BLAKE2b-256 |
c28d2cd72cec5e39d4143e220c8a07151f74d361531cbb7098e926b943068428
|
File details
Details for the file fastrag_cli-0.1.8-py3-none-any.whl.
File metadata
- Download URL: fastrag_cli-0.1.8-py3-none-any.whl
- Upload date:
- Size: 4.9 MB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.0.1 CPython/3.12.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c6a39b59547f490e1baa408cc9a83eeae20562a5783e4159066ac596d98c0dd7
|
|
| MD5 |
f2daef2d9b81061c80c54bbdc5b3185c
|
|
| BLAKE2b-256 |
c4467e21e87deca05776b6de100e43140d45c743cb41cdc6cd22cc21f4dde842
|