Skip to main content

SQLAlchemy plugin for Spakky framework

Project description

Spakky SQLAlchemy

Spakky Framework를 위한 SQLAlchemy 통합 플러그인입니다. Connection/session/transaction Pod와 outbox/agent persistence contribution을 SQLAlchemy로 제공합니다.

설치

pip install spakky-sqlalchemy

Spakky extras로도 설치할 수 있습니다.

pip install spakky[sqlalchemy]

PostgreSQL 예제를 그대로 따라간다면 driver까지 포함하는 extra를 사용하세요.

pip install "spakky[database-postgres]"

설정

SPAKKY_SQLALCHEMY__ 접두사로 환경변수를 설정합니다.

# 필수
export SPAKKY_SQLALCHEMY__CONNECTION_STRING="postgresql+psycopg://user:pass@localhost/db"

# Engine option(선택)
export SPAKKY_SQLALCHEMY__ECHO="false"
export SPAKKY_SQLALCHEMY__ECHO_POOL="false"

# Connection pool option(선택)
export SPAKKY_SQLALCHEMY__POOL_SIZE="5"
export SPAKKY_SQLALCHEMY__POOL_MAX_OVERFLOW="10"
export SPAKKY_SQLALCHEMY__POOL_TIMEOUT="30.0"
export SPAKKY_SQLALCHEMY__POOL_RECYCLE="-1"
export SPAKKY_SQLALCHEMY__POOL_PRE_PING="false"

# Session option(선택)
export SPAKKY_SQLALCHEMY__SESSION_AUTOFLUSH="true"
export SPAKKY_SQLALCHEMY__SESSION_EXPIRE_ON_COMMIT="true"

# Transaction option(선택)
export SPAKKY_SQLALCHEMY__AUTOCOMMIT="true"

# 비동기 지원(선택)
export SPAKKY_SQLALCHEMY__SUPPORT_ASYNC_MODE="true"

postgresql+psycopg:// connection string은 psycopg driver가 필요합니다. spakky[database-postgres]를 쓰거나 프로젝트 의존성에 psycopg[binary]를 직접 추가하세요.

사용법

전체 흐름

spakky-sqlalchemyspakky-data의 추상 계약을 SQLAlchemy로 구현합니다.

  1. 도메인 Aggregate를 정의합니다.
  2. @Table(Domain)AbstractMappableTable[Domain]으로 ORM table을 매핑합니다.
  3. AbstractGenericRepository 또는 AbstractAsyncGenericRepository를 상속한 @Repository()를 등록합니다.
  4. Command UseCase에는 Repository를 주입하고 @Transactional()로 transaction 경계를 둡니다.
  5. 복잡한 조회는 Repository에 find_by_* 메서드를 추가하지 않고 QueryUseCase에서 SessionManager / AsyncSessionManager를 직접 사용합니다.

자세한 end-to-end 예제는 데이터베이스 가이드를 참고하세요.

도메인 매핑 테이블 정의

@Table decorator와 AbstractMappableTable 상속으로 domain model mapping이 있는 ORM table을 정의합니다.

from typing import Self
from uuid import UUID

from sqlalchemy import String
from sqlalchemy.orm import Mapped, mapped_column

from spakky.domain.models.aggregate_root import AbstractAggregateRoot
from spakky.plugins.sqlalchemy.orm.table import AbstractMappableTable, Table

# Domain model
class User(AbstractAggregateRoot[UUID]):
    name: str
    email: str

# domain mapping을 가진 table 정의
@Table(User)
class UserTable(AbstractMappableTable[User]):
    __tablename__ = "users"

    uid: Mapped[UUID] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(255))
    email: Mapped[str] = mapped_column(String(255))

    @classmethod
    def from_domain(cls, domain: User) -> Self:
        return cls(uid=domain.uid, name=domain.name, email=domain.email)

    def to_domain(self) -> User:
        return User(uid=self.uid, name=self.name, email=self.email)

Repository 구현

AbstractGenericRepository 또는 AbstractAsyncGenericRepository를 확장합니다:

from uuid import UUID
from spakky.data.stereotype.repository import Repository
from spakky.plugins.sqlalchemy.persistency.repository import (
    AbstractGenericRepository,
    AbstractAsyncGenericRepository,
)

# 동기 repository
@Repository()
class UserRepository(AbstractGenericRepository[User, UUID]):
    pass  # CRUD method 상속

# 비동기 repository
@Repository()
class AsyncUserRepository(AbstractAsyncGenericRepository[User, UUID]):
    pass  # 비동기 CRUD method 상속

트랜잭션 사용

spakky-data@Transactional decorator를 사용합니다.

from spakky.core.stereotype.usecase import UseCase
from spakky.data.aspects.transactional import Transactional

@UseCase()
class CreateUserUseCase:
    def __init__(self, user_repo: UserRepository) -> None:
        self._user_repo = user_repo

    @Transactional()
    def execute(self, name: str, email: str) -> User:
        user = User.create(name, email)
        return self._user_repo.save(user)

비동기 트랜잭션

같은 @Transactional() decorator가 동기와 비동기 메서드 모두에서 동작합니다. 프레임워크는 메서드가 coroutine인지에 따라 올바른 aspect를 자동 선택합니다.

from spakky.data.aspects.transactional import Transactional

@UseCase()
class AsyncCreateUserUseCase:
    def __init__(self, user_repo: AsyncUserRepository) -> None:
        self._user_repo = user_repo

    @Transactional()
    async def execute(self, name: str, email: str) -> User:
        user = User.create(name, email)
        return await self._user_repo.save(user)

Session 직접 접근

복잡한 query는 QueryUseCase에서 SQLAlchemy session에 직접 접근합니다. CQRS 원칙에 따라 query는 repository에 query 메서드를 추가하지 않고 직접 구현해야 합니다.

from spakky.core.common.mutability import immutable
from spakky.core.stereotype.usecase import UseCase
from spakky.domain.application.query import AbstractQuery, IAsyncQueryUseCase
from spakky.plugins.sqlalchemy.persistency.session_manager import AsyncSessionManager
from sqlalchemy import select


@immutable
class FindUserByEmailQuery(AbstractQuery):
    email: str


@immutable
class UserDTO:
    uid: UUID
    name: str
    email: str


@UseCase()
class FindUserByEmailUseCase(IAsyncQueryUseCase[FindUserByEmailQuery, UserDTO | None]):
    def __init__(self, session_manager: AsyncSessionManager) -> None:
        self._session_manager = session_manager

    async def run(self, query: FindUserByEmailQuery) -> UserDTO | None:
        result = await self._session_manager.session.execute(
            select(UserTable).where(UserTable.email == query.email)
        )
        table = result.scalar_one_or_none()
        if table is None:
            return None
        return UserDTO(uid=table.uid, name=table.name, email=table.email)

Schema Registry

migration용 table metadata를 얻으려면 SchemaRegistry에 접근합니다:

from spakky.plugins.sqlalchemy.orm.schema_registry import SchemaRegistry

@Pod()
class MigrationService:
    def __init__(self, schema_registry: SchemaRegistry) -> None:
        self._schema_registry = schema_registry

    def get_metadata(self) -> MetaData:
        return self._schema_registry.metadata

Outbox Contribution

spakky-sqlalchemy base plugin은 SQLAlchemy connection, session, transaction, schema registry만 등록합니다. SQLAlchemy 기반 Outbox storage/table은 별도 feature contribution으로 제공합니다.

Outbox integration이 필요하면 spakky-outbox와 함께 설치하거나 SQLAlchemy plugin의 outbox extra를 사용합니다.

pip install "spakky-sqlalchemy[outbox]"
[project.entry-points."spakky.contributions.spakky.outbox"]
spakky-sqlalchemy = "spakky.plugins.sqlalchemy.contributions.outbox:initialize"

이 contribution은 spakky-outbox feature와 spakky-sqlalchemy provider가 모두 active일 때 base plugin 이후 로드됩니다. load_plugins(include=...)를 사용한다면 include set에 두 plugin을 모두 넣어야 합니다. save()는 현재 transactional session을 사용하므로 비즈니스 데이터와 Outbox 메시지가 같은 DB transaction 안에서 commit 또는 rollback됩니다.

from spakky.plugins.sqlalchemy.outbox.storage import AsyncSqlAlchemyOutboxStorage

storage = app.container.get(type_=AsyncSqlAlchemyOutboxStorage)

Agent Persistence Contribution

spakky-agent의 durable state, signal, evidence repository는 production in-memory fallback을 두지 않고 provider contribution으로만 공급됩니다. SQLAlchemy 구현은 별도 spakky-agent-sqlalchemy 패키지를 만들지 않고 이 패키지의 agent extra와 contribution entry point로 제공합니다.

pip install "spakky-sqlalchemy[agent]"
[project.entry-points."spakky.contributions.spakky.agent"]
spakky-sqlalchemy = "spakky.plugins.sqlalchemy.contributions.agent:initialize"

spakky-agentspakky-sqlalchemy base plugin이 함께 active이면 contribution loader가 AgentStateTable, AgentSignalTable, AgentEvidenceTable과 세 repository 구현을 등록합니다. AgentExecutionSpec(recovery=RecoveryStrategy.ACTION_BOUNDARY) 또는 accepted_signals를 사용하는 Agent는 bootstrap 시 IAgentStateRepository, IAgentSignalRepository, IAgentEvidenceRepository를 모두 요구하므로, 이 contribution이 누락되면 core가 AgentPersistenceConfigurationError로 실패합니다. 이 패키지는 운영용 in-memory fallback을 제공하지 않습니다.

spakky-agentspakky-sqlalchemy가 모두 active이면 다음 Pod과 schema가 등록됩니다.

  • SqlAlchemyAgentStateRepository
  • SqlAlchemyAgentSignalRepository
  • SqlAlchemyAgentEvidenceRepository
  • spakky_agent_state, spakky_agent_signal, spakky_agent_evidence

각 repository는 현재 SQLAlchemy transactional session을 사용합니다. Signal은 consumed_at으로 pending queue를 구분하고, Evidence repository는 append/read 메서드만 노출해 agent-facing append-only 계약을 유지합니다.

주요 기능

  • Domain-Table mapping: domain model과 ORM table 간 양방향 변환
  • Generic repository: composite PK를 지원하는 사전 구성 CRUD operation
  • 동기/비동기 지원: 동기 및 비동기 operation 모두 지원
  • Scoped session: thread/context-safe session 관리
  • 낙관적 lock: 동시 update를 위한 내장 VersionConflictError
  • Schema registry: 중앙화된 table metadata 관리

구성 요소

컴포넌트 설명
@Table domain mapping이 있는 ORM table 등록 decorator
AbstractTable ORM table 기반 클래스(domain mapping 없는 infrastructure table)
AbstractMappableTable from_domain/to_domain domain mapping을 가진 generic table
AbstractGenericRepository CRUD operation을 가진 동기 repository
AbstractAsyncGenericRepository CRUD operation을 가진 비동기 repository
SchemaRegistry table-domain mapping 중앙 registry
SessionManager 동기 scoped session 관리
AsyncSessionManager 비동기 scoped session 관리
Transaction 동기 transaction 구현
AsyncTransaction 비동기 transaction 구현
ConnectionManager 동기 SQLAlchemy engine lifecycle
AsyncConnectionManager 비동기 SQLAlchemy engine lifecycle
SQLAlchemyConnectionConfig 환경변수 기반 설정
SqlAlchemyAgentStateRepository IAgentStateRepository의 SQLAlchemy 구현
SqlAlchemyAgentSignalRepository IAgentSignalRepository의 SQLAlchemy 구현. consumed_at으로 pending signal queue를 구분
SqlAlchemyAgentEvidenceRepository IAgentEvidenceRepository의 SQLAlchemy 구현. append/read만 노출

Repository 메서드

동기/비동기 repository는 모두 다음을 제공합니다:

메서드 설명
get(id) ID로 aggregate 조회, 없으면 EntityNotFoundError 발생
get_or_none(id) ID로 aggregate 조회, 없으면 None 반환
contains(id) aggregate 존재 여부 확인
range(ids) ID 목록으로 여러 aggregate 조회
save(aggregate) aggregate 저장(insert 또는 update)
save_all(aggregates) 여러 aggregate 저장
delete(aggregate) aggregate 삭제
delete_all(aggregates) 여러 aggregate 삭제

라이선스

MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

spakky_sqlalchemy-6.8.0.tar.gz (16.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

spakky_sqlalchemy-6.8.0-py3-none-any.whl (26.4 kB view details)

Uploaded Python 3

File details

Details for the file spakky_sqlalchemy-6.8.0.tar.gz.

File metadata

  • Download URL: spakky_sqlalchemy-6.8.0.tar.gz
  • Upload date:
  • Size: 16.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for spakky_sqlalchemy-6.8.0.tar.gz
Algorithm Hash digest
SHA256 cd080dd3db505355e5602e8445b22b8a1f023ff1a38a655b746b2fec8472cc6b
MD5 e03fa761744052e8e770b76aa76f7953
BLAKE2b-256 c26c42d2ecbcb578fe3210b48db2ac5bc4e475b2ca0885460b25fcf6ffe5082e

See more details on using hashes here.

Provenance

The following attestation bundles were made for spakky_sqlalchemy-6.8.0.tar.gz:

Publisher: release.yml on E5presso/spakky-framework

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file spakky_sqlalchemy-6.8.0-py3-none-any.whl.

File metadata

File hashes

Hashes for spakky_sqlalchemy-6.8.0-py3-none-any.whl
Algorithm Hash digest
SHA256 52d05b2869499e8fc7d8469b8adeb355ce3b1959c15cf297fd7cb4fac89f1e52
MD5 c97ef8448461a57bd503828f5116561f
BLAKE2b-256 2f915cb7151b9efa9b662a36c17e1d04f9e049fce350f631741e18e18f69daa9

See more details on using hashes here.

Provenance

The following attestation bundles were made for spakky_sqlalchemy-6.8.0-py3-none-any.whl:

Publisher: release.yml on E5presso/spakky-framework

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page