A LangGraph-based extension framework for complex workflow applications, enabling the integration of various AI models and tools into a cohesive system.
Project description
Black LangCube
A LangGraph-based extension framework designed to facilitate the development of complex applications by providing a structured way to define and manage workflows.
🚀 Features
- BaseGraph Framework: Foundational interface for constructing, compiling, and executing stateful workflow graphs
- Data Structures: Pydantic models for scientific article metadata, search strategies, outlines, and more
- LLM Nodes: Pre-built nodes for common language model operations
- Helper Utilities: Token counting, result processing, file management, and workflow utilities
- Subgraph System: Modular subworkflows for translation, output generation, and specialized tasks
- Extensible Architecture: Easy to extend with custom nodes and workflows
📦 Installation
From PyPI (when published):
pip install black_langcube
Development Installation:
git clone https://github.com/cerna-kostka/black-langcube.git
cd black-langcube
pip install -e .
With optional dependencies:
pip install black_langcube[dev,examples]
🏗️ Core Components
BaseGraph
The foundation for building stateful workflow graphs using LangGraph:
from black_langcube.graf.graph_base import BaseGraph, GraphState
class MyCustomGraph(BaseGraph):
def __init__(self, user_message, folder_name, language):
super().__init__(MyGraphState, user_message, folder_name, language)
self.build_graph()
def build_graph(self):
# Add nodes and edges to your workflow
self.add_node("my_node", my_node_function)
self.add_edge(START, "my_node")
self.add_edge("my_node", END)
@property
def workflow_name(self):
return "my_custom_graph"
LLMNode
A base class for defining nodes that interact with language models:
from black_langcube.llm_modules.LLMNodes.LLMNode import LLMNode
class MyCustomNode(LLMNode):
def generate_messages(self):
return [
("system", "You are a helpful assistant"),
("human", self.state.get("user_input", ""))
]
def execute(self, extra_input=None):
result, tokens = self.run_chain(extra_input)
return {"output": result, "tokens": tokens}
Data Structures
Pydantic models for structured data handling:
from black_langcube.data_structures.data_structures import Article, Strategies, Outline
# Use pre-defined data structures
article = Article(topic="AI Research", language="English")
strategies = Strategies(strategy1="Search academic papers", strategy2="Analyze trends")
LLM Nodes
Pre-built nodes for language model operations:
from black_langcube.llm_modules.LLMNodes.LLMNode import LLMNode
class MyCustomNode(LLMNode):
def generate_messages(self):
return [
("system", "You are a helpful assistant"),
("human", self.state.get("user_input", ""))
]
def execute(self, extra_input=None):
result, tokens = self.run_chain(extra_input)
return {"output": result, "tokens": tokens}
📚 Architecture
The library is organized into several key modules:
graf/: Core graph classes and workflow definitionsdata_structures/: Pydantic models for data validationllm_modules/: Language model integration and node definitionshelper_modules/: Utility functions and helper classesmessages/: Message formatting and composition utilitiesprompts/: Prompt templates and configurationsformat_instructions/: Output formatting utilitiesdatabase/: SQLAlchemy async ORM models andDatabaseServicestorage_service.py: Three-mode storage abstraction (file,database,dual)
🛠️ Usage Examples
Basic Workflow
from black_langcube.graf.graph_base import BaseGraph, GraphState
from langgraph.graph import START, END
class SimpleWorkflow(BaseGraph):
def __init__(self, message, folder, language):
super().__init__(GraphState, message, folder, language)
self.build_graph()
def build_graph(self):
def process_message(state):
return {"result": f"Processed: {state['messages'][-1].content}"}
self.add_node("process", process_message)
self.add_edge(START, "process")
self.add_edge("process", END)
@property
def workflow_name(self):
return "simple_workflow"
# Usage
workflow = SimpleWorkflow("Hello, world!", "output", "English")
result = workflow.run()
Using Subgraphs
from black_langcube.graf.subgrafs.translator_en_subgraf import TranslatorEnSubgraf
# Translation subgraph
translator = TranslatorEnSubgraf(config, subfolder="translations")
result = translator.run(extra_input={
"translation_input": "Bonjour le monde",
"language": "French"
})
🔧 Configuration
The library uses environment variables for configuration. Copy .env.example
from the project root to .env and fill in your values — it documents every
configurable variable with its default and a one-line description.
OPENAI_API_KEY=your_openai_api_key_here
# optional: LangChain configuration
LANGCHAIN_API_KEY=your_langchain_api_key_here
LANGCHAIN_TRACING_V2=true
LLM Configuration
Global provider
Set PROVIDER to choose the default LLM provider for every processing step:
PROVIDER=openai # openai (default) | gemini | mistral
Per-step provider overrides
Each pipeline step can use a different provider without changing any code.
Set {STEP}_PROVIDER to override only that step; all other steps continue to
use PROVIDER:
| Step | Override variable | Example |
|---|---|---|
llm_analyst() |
ANALYST_PROVIDER |
ANALYST_PROVIDER=gemini |
llm_outline() |
OUTLINE_PROVIDER |
OUTLINE_PROVIDER=openai |
llm_text() |
TEXT_PROVIDER |
TEXT_PROVIDER=gemini |
llm_check_title() |
CHECK_TITLE_PROVIDER |
CHECK_TITLE_PROVIDER=openai |
llm_title_abstract() |
TITLE_ABSTRACT_PROVIDER |
TITLE_ABSTRACT_PROVIDER=openai |
get_llm_low() |
LOW_PROVIDER |
LOW_PROVIDER=mistral |
get_llm_high() |
HIGH_PROVIDER |
HIGH_PROVIDER=openai |
Per-step model name overrides
Override the model name for a specific (provider, step) combination using
{PROVIDER}_MODEL_{STEP}:
OPENAI_MODEL_LOW=gpt-4o-mini # default
OPENAI_MODEL_HIGH=gpt-4.1 # default
GEMINI_MODEL_ANALYST=gemini-2.5-pro # default
GEMINI_MODEL_CHECK_TITLE=gemini-2.5-flash # use cheaper model for title checks
Note:
{STEP}_PROVIDERoverrides are read on every factory call and take effect immediately without a restart.{PROVIDER}_MODEL_{STEP}overrides are evaluated once at module import time — a process restart is required for changes to model-name env vars to take effect.
Mixed-provider example
Use Gemini for cost-sensitive steps and OpenAI for quality-critical ones without any code changes:
PROVIDER=openai # default for all unspecified steps
ANALYST_PROVIDER=gemini # cost-sensitive analysis
TEXT_PROVIDER=gemini # cost-sensitive text generation
OUTLINE_PROVIDER=openai # quality-critical outline
CHECK_TITLE_PROVIDER=openai # quality-critical title check
GEMINI_API_KEY=your-gemini-key-here
OPENAI_API_KEY=your-openai-key-here
Verifying the resolved configuration
Use get_llm_config_summary() to print the resolved (provider, model) for
every step — useful at startup or in test logs:
from black_langcube import get_llm_config_summary
summary = get_llm_config_summary()
for step, info in summary.items():
print(f"{step:20s} provider={info['provider']} model={info['model']}")
Example output with the mixed-provider configuration above:
analyst provider=gemini model=gemini-2.5-pro
outline provider=openai model=gpt-4.1
text provider=gemini model=gemini-2.5-pro
check_title provider=openai model=gpt-4.1
title_abstract provider=openai model=gpt-4.1
low provider=openai model=gpt-4o-mini
high provider=openai model=gpt-4.1
Optional provider dependencies
The default pip install black_langcube includes only the OpenAI integration.
Install additional extras for other providers:
pip install black_langcube[gemini] # adds langchain-google-genai
pip install black_langcube[mistral] # adds langchain-mistralai
Fail-Fast Validation
Call validate_config() at the top of your application entry point to detect
misconfiguration immediately, before any pipeline execution begins:
from black_langcube import validate_config, ConfigurationError
import sys
try:
validate_config()
except ConfigurationError as e:
print(f"Configuration error: {e}", file=sys.stderr)
sys.exit(1)
validate_config() checks every required environment variable and raises
ConfigurationError with a message listing all missing variables, so you
see every problem at once. It is safe to call multiple times (idempotent).
API keys are stored internally as pydantic.SecretStr, which prevents the raw
value from appearing in str(), repr(), or log output. Call
.get_secret_value() only at the last moment when the key must be used.
Storage and Database Configuration
The library supports three output storage modes controlled by the STORAGE_MODE
environment variable:
STORAGE_MODE |
Behavior |
|---|---|
file (default) |
Write results to timestamped folders — existing behavior, fully backward-compatible |
database |
Write results only to the database |
dual |
Write to both file system and database — recommended migration path |
Set a database connection URL via the DATABASE_URL environment variable:
# SQLite (local/testing)
DATABASE_URL=sqlite:///./black_langcube.db
# PostgreSQL (production)
DATABASE_URL=postgresql://user:password@host:5432/dbname
The library automatically converts DATABASE_URL to the appropriate async
dialect (postgresql+asyncpg:// or sqlite+aiosqlite://).
Optional database dependencies
Install the database extras to enable database-backed storage:
pip install black_langcube[database]
This installs sqlalchemy[asyncio]>=2.0, asyncpg (PostgreSQL), and
aiosqlite (SQLite / tests).
Migration guide for existing file-mode users
Existing deployments are unaffected by default. STORAGE_MODE defaults to
file when the environment variable is unset. To migrate:
- Install
black_langcube[database]. - Set
DATABASE_URLto your database connection string. - Start with
STORAGE_MODE=dualto write to both file and database while you verify the database output. - Switch to
STORAGE_MODE=databaseonce you are satisfied.
Using StorageService directly
import asyncio
from black_langcube.storage_service import StorageService
async def main():
# Uses STORAGE_MODE and DATABASE_URL from environment
storage = StorageService()
await storage.save_graph_output(
session_id="my-session-uuid",
graph_name="graf1",
data={"result": "..."},
step_name="analysis",
)
asyncio.run(main())
📖 Examples
See the examples/ directory for complete working examples:
- Basic Graph: Simple workflow with custom nodes
- Translation Pipeline: Multi-language processing workflow
- Scientific Article Processing: Complex multi-step analysis pipeline
- Custom Data Structures: Extending the framework with your own models
🧪 Development
Setting up development environment:
git clone https://github.com/cerna-kostka/black-langcube.git
cd black-langcube
pip install -e .[dev]
Running tests:
pytest
Code formatting:
black .
isort .
Parallel Fan-Out (Scatter-Gather)
BaseGraph exposes add_parallel_nodes for wiring an intra-graph fan-out: a single node dispatches to multiple branches that run concurrently (via LangGraph's Send API), and a merge node aggregates their results.
State setup
Use operator.add (or any reducer) with Annotated so that concurrent branches can each append to the same list field without overwriting each other:
import operator
from typing import Annotated
from black_langcube.graf.graph_base import GraphState
class FanOutState(GraphState):
topic: str
branch_results: Annotated[list, operator.add] # reducer – each branch appends
merged_summary: str
Graph wiring
from langgraph.graph import START, END
from black_langcube.graf.graph_base import BaseGraph
class MyFanOutGraph(BaseGraph):
def __init__(self, topic, folder, language="English"):
super().__init__(FanOutState, topic, folder, language)
self._topic = topic
self._build()
def _build(self):
def prepare(state):
return {} # fan-out source
def branch_a(state):
return {"branch_results": [f"A: {state['topic']}"]}
def branch_b(state):
return {"branch_results": [f"B: {state['topic']}"]}
def merge(state):
return {"merged_summary": " | ".join(state["branch_results"])}
self.add_node("prepare", prepare)
self.add_node("branch_a", branch_a)
self.add_node("branch_b", branch_b)
self.add_node("merge", merge)
self.add_edge(START, "prepare")
# Wire fan-out → concurrent branches → merge
self.add_parallel_nodes("prepare", ["branch_a", "branch_b"], "merge")
self.add_edge("merge", END)
@property
def workflow_name(self):
return "my_fanout"
A custom router_fn can be supplied to control what state each branch
receives:
from langgraph.types import Send
def router(state):
return [
Send("branch_a", {**state, "mode": "fast"}),
Send("branch_b", {**state, "mode": "thorough"}),
]
self.add_parallel_nodes("prepare", ["branch_a", "branch_b"], "merge", router_fn=router)
Pipeline-level parallelism
To run independent graph instances simultaneously, use run_parallel_pipeline:
import asyncio
from black_langcube import run_parallel_pipeline
graph_a = MyFanOutGraph("topic A", "output/a")
graph_b = MyFanOutGraph("topic B", "output/b")
results = asyncio.run(run_parallel_pipeline([graph_a, graph_b]))
# results["status"] → "completed" | "partial_failure"
# results["parallel_results"] → [result_a, result_b]
See src/black_langcube/examples/parallel_fanout_workflow.py for a fully working end-to-end example.
- Python 3.9+
- LangChain >= 0.3.24
- LangGraph >= 0.3.7
- Pydantic >= 2.0.0
- OpenAI API access
🤝 Contributing
This is a work in progress and contributions are welcome! Please feel free to:
- Fork the repository
- Create a feature branch
- Make your changes
- Add tests if applicable
- Submit a pull request
📄 License
MIT License (MIT)
⚠️ Note
This library is intended to be used within a larger application context. The code is provided as-is and is actively being improved. Take it with a grain of salt and feel free to contribute improvements!
🔗 Links
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file black_langcube-0.4.4.tar.gz.
File metadata
- Download URL: black_langcube-0.4.4.tar.gz
- Upload date:
- Size: 95.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
54df066add9d564cce6970c253e2990f267fe11b4b9358ebd1361c3be1417d06
|
|
| MD5 |
be2c6d6cb1621ce431785bfda3d75497
|
|
| BLAKE2b-256 |
308939849db6a26dda2c460e186cbd28387536a3189b475cc8f947e4d9834faa
|
Provenance
The following attestation bundles were made for black_langcube-0.4.4.tar.gz:
Publisher:
python-publish.yml on cerna-kostka/black-langcube
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
black_langcube-0.4.4.tar.gz -
Subject digest:
54df066add9d564cce6970c253e2990f267fe11b4b9358ebd1361c3be1417d06 - Sigstore transparency entry: 1256010161
- Sigstore integration time:
-
Permalink:
cerna-kostka/black-langcube@834812b43db12dd73e08b785141ca42942876709 -
Branch / Tag:
refs/tags/0.4.4 - Owner: https://github.com/cerna-kostka
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
python-publish.yml@834812b43db12dd73e08b785141ca42942876709 -
Trigger Event:
release
-
Statement type:
File details
Details for the file black_langcube-0.4.4-py3-none-any.whl.
File metadata
- Download URL: black_langcube-0.4.4-py3-none-any.whl
- Upload date:
- Size: 97.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
bb47b254d93f7586884ddaa1304eebd6c1e6313467cb142b5f540e5194ae181c
|
|
| MD5 |
e082726c00d1015dc29deb5716b918cf
|
|
| BLAKE2b-256 |
10210a764134e3be49e6080a39f86ac78df6ff2910cbf515c995ba25da155661
|
Provenance
The following attestation bundles were made for black_langcube-0.4.4-py3-none-any.whl:
Publisher:
python-publish.yml on cerna-kostka/black-langcube
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
black_langcube-0.4.4-py3-none-any.whl -
Subject digest:
bb47b254d93f7586884ddaa1304eebd6c1e6313467cb142b5f540e5194ae181c - Sigstore transparency entry: 1256010220
- Sigstore integration time:
-
Permalink:
cerna-kostka/black-langcube@834812b43db12dd73e08b785141ca42942876709 -
Branch / Tag:
refs/tags/0.4.4 - Owner: https://github.com/cerna-kostka
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
python-publish.yml@834812b43db12dd73e08b785141ca42942876709 -
Trigger Event:
release
-
Statement type: