Skip to main content

GraphQL SDL generation and query optimization for SQLModel

Project description

SQLModel Nexus

From SQLModel to running GraphQL API, Core API DTOs, and MCP Server — with zero boilerplate.

pypi PyPI Downloads Python Versions

Read This README in Order

We reuse one example throughout: Sprint → Task → User.

  • A Sprint has many Tasks
  • A Task has one owner (a User)
  • The API also wants derived fields such as task_count and contributors

The concepts appear in this order on purpose:

  1. GraphQL Mode — the fastest path from SQLModel to a running API
  2. Core API Mode — DefineSubset DTOs for REST endpoints, progressing from implicit auto-loading to resolve_*, post_*, and cross-layer data flow
  3. MCP Server — expose the same models to AI assistants

What sqlmodel-nexus Gives You

Need What you write What the framework does
GraphQL API @query / @mutation on SQLModel methods Auto-generates SDL, resolves relationships via DataLoader
REST / use-case DTOs DefineSubset + field declarations Implicit auto-loading, N+1 prevention, ORM→DTO conversion
Derived fields post_* methods Runs after all nested data is resolved
Cross-layer data flow ExposeAs, SendTo, Collector Pass context down or aggregate values up
Non-ORM relationships Relationship(...) on entity Same DataLoader infra, same auto-loading
AI-ready APIs config_simple_mcp_server(base=...) Progressive-disclosure MCP tools

Install

pip install sqlmodel-nexus
pip install sqlmodel-nexus[fastmcp]  # with MCP support

GraphQL Mode

The fastest path: SQLModel + @query decorator → running GraphQL API.

30-Second Quick Start

from fastapi import FastAPI
from fastapi.responses import HTMLResponse
from pydantic import BaseModel
from sqlmodel import SQLModel, Field, Relationship, select
from sqlmodel_nexus import query, mutation, GraphQLHandler

class User(SQLModel, table=True):
    id: int | None = Field(default=None, primary_key=True)
    name: str
    email: str

class Post(SQLModel, table=True):
    id: int | None = Field(default=None, primary_key=True)
    title: str
    author_id: int = Field(foreign_key="user.id")
    author: User | None = Relationship(back_populates="posts")

    @query
    async def get_all(cls, limit: int = 10) -> list['Post']:
        """Get all posts."""
        async with get_session() as session:
            return (await session.exec(select(cls).limit(limit))).all()

    @mutation
    async def create(cls, title: str, author_id: int) -> 'Post':
        """Create a new post."""
        async with get_session() as session:
            post = cls(title=title, author_id=author_id)
            session.add(post)
            await session.commit()
            await session.refresh(post)
            return post

handler = GraphQLHandler(base=SQLModel, session_factory=async_session)

class GraphQLRequest(BaseModel):
    query: str

app = FastAPI()

@app.get("/graphql", response_class=HTMLResponse)
async def graphiql():
    return handler.get_graphiql_html()

@app.post("/graphql")
async def graphql(req: GraphQLRequest):
    return await handler.execute(req.query)

Run uvicorn app:app and visit http://localhost:8000/graphql.

Relationships Auto-Resolved

Relationships are resolved automatically via DataLoader. No selectinload, no manual joins:

{
  postGetAll(limit: 5) {
    id
    title
    author { name email }
  }
}

The framework walks the GraphQL selection tree level-by-level, collects FK values, and batch-loads via DataLoader. One query per relationship, regardless of result size.

Pagination

Add order_by to list relationships for automatic pagination support:

class User(SQLModel, table=True):
    id: int | None = Field(default=None, primary_key=True)
    name: str
    posts: list["Post"] = Relationship(
        back_populates="author",
        sa_relationship_kwargs={"order_by": "Post.id"},
    )

handler = GraphQLHandler(base=SQLModel, session_factory=async_session, enable_pagination=True)
{
  userGetAll {
    name
    posts(limit: 3, offset: 0) {
      items { title }
      pagination { has_more total_count }
    }
  }
}

Auto-Generated Standard Queries

Skip @query decorators entirely — let the framework generate by_id and by_filter for every entity:

from sqlmodel_nexus import GraphQLHandler, AutoQueryConfig

handler = GraphQLHandler(
    base=SQLModel,
    session_factory=async_session,
    auto_query_config=AutoQueryConfig(session_factory=async_session),
)
{ userById(id: 1) { name email } }
{ userByFilter(filter: { name: "Alice" }, limit: 5) { id name } }

Core API Mode

Use Core API mode when you want the same DataLoader-based batching outside GraphQL — for FastAPI REST endpoints, service-layer response assembly, or any use-case DTO.

The concepts progress in order: auto-loading → resolve_* → post_* → cross-layer flow.

Step 1: DefineSubset + Implicit Auto-Loading

The simplest Core API case: select fields from SQLModel entities, declare relationship fields — they load automatically.

from sqlmodel import SQLModel
from sqlmodel_nexus import DefineSubset, ErManager

class UserDTO(DefineSubset):
    __subset__ = (User, ("id", "name"))

class TaskDTO(DefineSubset):
    __subset__ = (Task, ("id", "title", "owner_id"))
    owner: UserDTO | None = None   # name matches Task.owner relationship → auto-loaded

class SprintDTO(DefineSubset):
    __subset__ = (Sprint, ("id", "name"))
    tasks: list[TaskDTO] = []      # name matches Sprint.tasks relationship → auto-loaded

# App startup — once
er = ErManager(base=SQLModel, session_factory=async_session)
Resolver = er.create_resolver()

# Per request
async def get_sprints():
    async with async_session() as session:
        sprints = (await session.exec(select(Sprint))).all()
    dtos = [SprintDTO(id=s.id, name=s.name) for s in sprints]
    return await Resolver().resolve(dtos)

How it works:

  • ErManager discovers all SQLModel entities and their ORM relationships
  • create_resolver() returns a Resolver class bound to that entity graph
  • When resolving, if a field name matches a relationship and the DTO type is compatible with the target entity, it's loaded via DataLoader automatically
  • FK fields (like owner_id) are hidden from serialization output but available internally

This is the Core API equivalent of GraphQL's relationship resolution — same DataLoader batching, zero resolve_* methods needed for standard relationships.

Step 2: resolve_* for Custom Loading

Use resolve_* when implicit auto-loading doesn't fit: the field name doesn't match a relationship, or you need custom logic.

from pydantic_resolve import Loader

async def comments_loader(task_ids: list[int]) -> list[list[Comment]]:
    """Batch load comments for multiple tasks."""
    ...

class TaskDTO(DefineSubset):
    __subset__ = (Task, ("id", "title", "owner_id"))
    owner: UserDTO | None = None          # implicit — matches Task.owner
    comments: list[CommentDTO] = []       # custom — no matching relationship
    comment_count: int = 0

    def resolve_comments(self, loader=Loader(comments_loader)):
        """Load comments via a custom batch function."""
        return loader.load(self.id)

    def post_comment_count(self):
        return len(self.comments)

Loader accepts a DataLoader class or an async batch function:

# By DataLoader class
def resolve_tags(self, loader=Loader(TagLoader)):
    return loader.load(self.id)

# By async batch function
async def load_permissions(user_ids):
    ...
def resolve_permissions(self, loader=Loader(load_permissions)):
    return loader.load(self.owner_id)

A useful mental model: resolve_* means "this field needs data from outside the current node."

Step 3: post_* — Derived Fields After Children Are Ready

post_* runs after all resolve_* and auto-loading completes for the current subtree. Use it for counts, aggregations, formatting — anything that depends on already-loaded data.

class SprintDTO(DefineSubset):
    __subset__ = (Sprint, ("id", "name"))
    tasks: list[TaskDTO] = []
    task_count: int = 0
    contributor_names: list[str] = []

    def post_task_count(self):
        return len(self.tasks)

    def post_contributor_names(self):
        return sorted({t.owner.name for t in self.tasks if t.owner})

Execution order for one SprintDTO:

  1. Implicit auto-load → tasks filled with TaskDTOs
  2. Each TaskDTO → implicit auto-load → owner filled
  3. post_task_countlen(self.tasks) = 2
  4. post_contributor_names → extract unique owner names
Question resolve_* post_*
Needs external IO? Yes Usually no
Runs before descendants are ready? Yes No
Good for counts, sums, labels? Sometimes Yes

Step 4: Cross-Layer Data Flow

Reach for these tools only when parent and child nodes need to coordinate.

  • ExposeAs: send ancestor data downward (parent → descendant)
  • SendTo + Collector: send child data upward (descendant → ancestor)
from typing import Annotated
from sqlmodel_nexus import ExposeAs, SendTo, Collector

class SprintDTO(DefineSubset):
    __subset__ = (Sprint, ("id", "name"))
    name: Annotated[str, ExposeAs('sprint_name')]     # expose to descendants
    tasks: list[TaskDTO] = []
    contributors: list[UserDTO] = []

    def post_contributors(self, collector=Collector('contributors')):
        return collector.values()                      # collect from descendants

class TaskDTO(DefineSubset):
    __subset__ = (Task, ("id", "title", "owner_id"))
    owner: Annotated[UserDTO | None, SendTo('contributors')] = None  # send upward
    full_title: str = ""

    def post_full_title(self, ancestor_context):
        return f"{ancestor_context['sprint_name']} / {self.title}"   # read from ancestor

Use this only when the shape of the tree matters:

  • A child needs ancestor context (sprint name, permissions)
  • A parent needs to aggregate values from many descendants (contributors, tags)

Step 5: Custom Relationships

For relationships that aren't in the ORM (cross-service calls, computed edges), declare them on the entity:

from sqlmodel_nexus import Relationship

async def tags_loader(task_ids: list[int]) -> list[list[Tag]]:
    """Batch load tags for multiple tasks."""
    ...

class Task(SQLModel, table=True):
    __relationships__ = [
        Relationship(fk="id", target=Tag, name="tags", loader=tags_loader, is_list=True)
    ]
    id: int | None = Field(default=None, primary_key=True)
    title: str

class TagDTO(DefineSubset):
    __subset__ = (Tag, ("id", "name"))

class TaskDTO(DefineSubset):
    __subset__ = (Task, ("id", "title"))
    tags: list[TagDTO] = []   # name matches custom relationship → auto-loaded

Custom relationships use the same DataLoader infrastructure and work with implicit auto-loading.


MCP Integration

Expose your SQLModel APIs to AI assistants with one function call.

Simple MCP Server

from sqlmodel_nexus.mcp import config_simple_mcp_server

mcp = config_simple_mcp_server(base=SQLModel, name="My API")
mcp.run()  # stdio mode

Tools: get_schema(), graphql_query(query), graphql_mutation(mutation).

Multi-App MCP Server

from sqlmodel_nexus.mcp import create_mcp_server

mcp = create_mcp_server(
    apps=[
        {"name": "blog", "base": BlogBase, "description": "Blog API"},
        {"name": "shop", "base": ShopBase, "description": "Shop API"},
    ],
    name="Multi-App API",
)
mcp.run()

Tools include list_apps(), list_queries(app_name), get_query_schema(name, app_name), graphql_query(query, app_name), etc.

pip install sqlmodel-nexus[fastmcp]

Demo

# GraphQL playground
uv run python -m demo.app
# visit localhost:8000/graphql

# Core API (REST)
uv run uvicorn demo.core_api.app:app --reload
# visit /docs

# MCP server
uv run --with fastmcp python -m demo.mcp_server

License

MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sqlmodel_nexus-1.0.0.tar.gz (277.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

sqlmodel_nexus-1.0.0-py3-none-any.whl (101.2 kB view details)

Uploaded Python 3

File details

Details for the file sqlmodel_nexus-1.0.0.tar.gz.

File metadata

  • Download URL: sqlmodel_nexus-1.0.0.tar.gz
  • Upload date:
  • Size: 277.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.8.13

File hashes

Hashes for sqlmodel_nexus-1.0.0.tar.gz
Algorithm Hash digest
SHA256 e02994fe6b1167363a37e6b018ea3fe649bb8138929864197b748a860ef580c8
MD5 ed6542347c66b887135cdac54cfd9a09
BLAKE2b-256 836027051d5134797b74ae00381042585aa30e9fc7bc2515c4837c6a76e37340

See more details on using hashes here.

File details

Details for the file sqlmodel_nexus-1.0.0-py3-none-any.whl.

File metadata

File hashes

Hashes for sqlmodel_nexus-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 5230a086278432c6168437c2102747a4d5441f5b8e085ba9ca5e4161c78bb163
MD5 b260c9b58f060d07512e5e4555b49024
BLAKE2b-256 662523a8f8e6db1799f34c8979e66394befd000c8901b71b0fce8b45740850d3

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page