SQLAlchemy plugin for Spakky framework
Project description
Spakky SQLAlchemy
Spakky Framework를 위한 SQLAlchemy 통합 플러그인입니다.
설치
pip install spakky-sqlalchemy
Spakky extras로도 설치할 수 있습니다.
pip install spakky[sqlalchemy]
PostgreSQL 예제를 그대로 따라간다면 driver까지 포함하는 extra를 사용하세요.
pip install "spakky[database-postgres]"
설정
SPAKKY_SQLALCHEMY__ 접두사로 환경변수를 설정합니다.
# 필수
export SPAKKY_SQLALCHEMY__CONNECTION_STRING="postgresql+psycopg://user:pass@localhost/db"
# Engine option(선택)
export SPAKKY_SQLALCHEMY__ECHO="false"
export SPAKKY_SQLALCHEMY__ECHO_POOL="false"
# Connection pool option(선택)
export SPAKKY_SQLALCHEMY__POOL_SIZE="5"
export SPAKKY_SQLALCHEMY__POOL_MAX_OVERFLOW="10"
export SPAKKY_SQLALCHEMY__POOL_TIMEOUT="30.0"
export SPAKKY_SQLALCHEMY__POOL_RECYCLE="-1"
export SPAKKY_SQLALCHEMY__POOL_PRE_PING="false"
# Session option(선택)
export SPAKKY_SQLALCHEMY__SESSION_AUTOFLUSH="true"
export SPAKKY_SQLALCHEMY__SESSION_EXPIRE_ON_COMMIT="true"
# Transaction option(선택)
export SPAKKY_SQLALCHEMY__AUTOCOMMIT="true"
# 비동기 지원(선택)
export SPAKKY_SQLALCHEMY__SUPPORT_ASYNC_MODE="true"
postgresql+psycopg:// connection string은 psycopg driver가 필요합니다.
spakky[database-postgres]를 쓰거나 프로젝트 의존성에 psycopg[binary]를 직접 추가하세요.
사용법
전체 흐름
spakky-sqlalchemy는 spakky-data의 추상 계약을 SQLAlchemy로 구현합니다.
- 도메인 Aggregate를 정의합니다.
@Table(Domain)과AbstractMappableTable[Domain]으로 ORM table을 매핑합니다.AbstractGenericRepository또는AbstractAsyncGenericRepository를 상속한@Repository()를 등록합니다.- Command UseCase에는 Repository를 주입하고
@Transactional()로 transaction 경계를 둡니다. - 복잡한 조회는 Repository에
find_by_*메서드를 추가하지 않고 QueryUseCase에서SessionManager/AsyncSessionManager를 직접 사용합니다.
자세한 end-to-end 예제는 데이터베이스 가이드를 참고하세요.
도메인 매핑 테이블 정의
@Table decorator와 AbstractMappableTable 상속으로 domain model mapping이 있는 ORM table을 정의합니다.
from typing import Self
from uuid import UUID
from sqlalchemy import String
from sqlalchemy.orm import Mapped, mapped_column
from spakky.domain.models.aggregate_root import AbstractAggregateRoot
from spakky.plugins.sqlalchemy.orm.table import AbstractMappableTable, Table
# Domain model
class User(AbstractAggregateRoot[UUID]):
name: str
email: str
# domain mapping을 가진 table 정의
@Table(User)
class UserTable(AbstractMappableTable[User]):
__tablename__ = "users"
uid: Mapped[UUID] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(255))
email: Mapped[str] = mapped_column(String(255))
@classmethod
def from_domain(cls, domain: User) -> Self:
return cls(uid=domain.uid, name=domain.name, email=domain.email)
def to_domain(self) -> User:
return User(uid=self.uid, name=self.name, email=self.email)
Repository 구현
AbstractGenericRepository 또는 AbstractAsyncGenericRepository를 확장합니다:
from uuid import UUID
from spakky.data.stereotype.repository import Repository
from spakky.plugins.sqlalchemy.persistency.repository import (
AbstractGenericRepository,
AbstractAsyncGenericRepository,
)
# 동기 repository
@Repository()
class UserRepository(AbstractGenericRepository[User, UUID]):
pass # CRUD method 상속
# 비동기 repository
@Repository()
class AsyncUserRepository(AbstractAsyncGenericRepository[User, UUID]):
pass # 비동기 CRUD method 상속
트랜잭션 사용
spakky-data의 @Transactional decorator를 사용합니다.
from spakky.core.stereotype.usecase import UseCase
from spakky.data.aspects.transactional import Transactional
@UseCase()
class CreateUserUseCase:
def __init__(self, user_repo: UserRepository) -> None:
self._user_repo = user_repo
@Transactional()
def execute(self, name: str, email: str) -> User:
user = User.create(name, email)
return self._user_repo.save(user)
비동기 트랜잭션
같은 @Transactional() decorator가 동기와 비동기 메서드 모두에서 동작합니다.
프레임워크는 메서드가 coroutine인지에 따라 올바른 aspect를 자동 선택합니다.
from spakky.data.aspects.transactional import Transactional
@UseCase()
class AsyncCreateUserUseCase:
def __init__(self, user_repo: AsyncUserRepository) -> None:
self._user_repo = user_repo
@Transactional()
async def execute(self, name: str, email: str) -> User:
user = User.create(name, email)
return await self._user_repo.save(user)
Session 직접 접근
복잡한 query는 QueryUseCase에서 SQLAlchemy session에 직접 접근합니다. CQRS 원칙에 따라 query는 repository에 query 메서드를 추가하지 않고 직접 구현해야 합니다.
from spakky.core.common.mutability import immutable
from spakky.core.stereotype.usecase import UseCase
from spakky.domain.application.query import AbstractQuery, IAsyncQueryUseCase
from spakky.plugins.sqlalchemy.persistency.session_manager import AsyncSessionManager
from sqlalchemy import select
@immutable
class FindUserByEmailQuery(AbstractQuery):
email: str
@immutable
class UserDTO:
uid: UUID
name: str
email: str
@UseCase()
class FindUserByEmailUseCase(IAsyncQueryUseCase[FindUserByEmailQuery, UserDTO | None]):
def __init__(self, session_manager: AsyncSessionManager) -> None:
self._session_manager = session_manager
async def run(self, query: FindUserByEmailQuery) -> UserDTO | None:
result = await self._session_manager.session.execute(
select(UserTable).where(UserTable.email == query.email)
)
table = result.scalar_one_or_none()
if table is None:
return None
return UserDTO(uid=table.uid, name=table.name, email=table.email)
Schema Registry
migration용 table metadata를 얻으려면 SchemaRegistry에 접근합니다:
from spakky.plugins.sqlalchemy.orm.schema_registry import SchemaRegistry
@Pod()
class MigrationService:
def __init__(self, schema_registry: SchemaRegistry) -> None:
self._schema_registry = schema_registry
def get_metadata(self) -> MetaData:
return self._schema_registry.metadata
Outbox Contribution
spakky-sqlalchemy base plugin은 SQLAlchemy connection, session, transaction,
schema registry만 등록합니다. SQLAlchemy 기반 Outbox storage/table은 별도 feature
contribution으로 제공합니다.
Outbox integration이 필요하면 spakky-outbox와 함께 설치하거나 SQLAlchemy plugin의
outbox extra를 사용합니다.
pip install "spakky-sqlalchemy[outbox]"
[project.entry-points."spakky.contributions.spakky.outbox"]
spakky-sqlalchemy = "spakky.plugins.sqlalchemy.contributions.outbox:initialize"
이 contribution은 spakky-outbox feature와 spakky-sqlalchemy provider가 모두
active일 때 base plugin 이후 로드됩니다. load_plugins(include=...)를 사용한다면
include set에 두 plugin을 모두 넣어야 합니다. save()는 현재 transactional session을
사용하므로 비즈니스 데이터와 Outbox 메시지가 같은 DB transaction 안에서 commit 또는
rollback됩니다.
from spakky.plugins.sqlalchemy.outbox.storage import AsyncSqlAlchemyOutboxStorage
storage = app.container.get(type_=AsyncSqlAlchemyOutboxStorage)
Agent Persistence Contribution
spakky-agent의 durable state, signal, evidence repository는 production
in-memory fallback을 두지 않고 provider contribution으로만 공급됩니다.
SQLAlchemy 구현은 별도 spakky-agent-sqlalchemy 패키지를 만들지 않고 이
패키지의 agent extra와 contribution entry point로 제공합니다.
pip install "spakky-sqlalchemy[agent]"
[project.entry-points."spakky.contributions.spakky.agent"]
spakky-sqlalchemy = "spakky.plugins.sqlalchemy.contributions.agent:initialize"
spakky-agent와 spakky-sqlalchemy base plugin이 함께 active이면 contribution loader가
AgentStateTable, AgentSignalTable, AgentEvidenceTable과 세 repository 구현을 등록합니다.
AgentExecutionSpec(recovery=RecoveryStrategy.ACTION_BOUNDARY) 또는 accepted_signals를
사용하는 Agent는 bootstrap 시 IAgentStateRepository, IAgentSignalRepository,
IAgentEvidenceRepository를 모두 요구하므로, 이 contribution이 누락되면 core가
AgentPersistenceConfigurationError로 실패합니다. 이 패키지는 운영용 in-memory fallback을
제공하지 않습니다.
spakky-agent와 spakky-sqlalchemy가 모두 active이면 다음 Pod과 schema가
등록됩니다.
SqlAlchemyAgentStateRepositorySqlAlchemyAgentSignalRepositorySqlAlchemyAgentEvidenceRepositoryspakky_agent_state,spakky_agent_signal,spakky_agent_evidence
각 repository는 현재 SQLAlchemy transactional session을 사용합니다. Signal은
consumed_at으로 pending queue를 구분하고, Evidence repository는 append/read
메서드만 노출해 agent-facing append-only 계약을 유지합니다.
주요 기능
- Domain-Table mapping: domain model과 ORM table 간 양방향 변환
- Generic repository: composite PK를 지원하는 사전 구성 CRUD operation
- 동기/비동기 지원: 동기 및 비동기 operation 모두 지원
- Scoped session: thread/context-safe session 관리
- 낙관적 lock: 동시 update를 위한 내장
VersionConflictError - Schema registry: 중앙화된 table metadata 관리
구성 요소
| 컴포넌트 | 설명 |
|---|---|
@Table |
domain mapping이 있는 ORM table 등록 decorator |
AbstractTable |
ORM table 기반 클래스(domain mapping 없는 infrastructure table) |
AbstractMappableTable |
from_domain/to_domain domain mapping을 가진 generic table |
AbstractGenericRepository |
CRUD operation을 가진 동기 repository |
AbstractAsyncGenericRepository |
CRUD operation을 가진 비동기 repository |
SchemaRegistry |
table-domain mapping 중앙 registry |
SessionManager |
동기 scoped session 관리 |
AsyncSessionManager |
비동기 scoped session 관리 |
Transaction |
동기 transaction 구현 |
AsyncTransaction |
비동기 transaction 구현 |
ConnectionManager |
동기 SQLAlchemy engine lifecycle |
AsyncConnectionManager |
비동기 SQLAlchemy engine lifecycle |
SQLAlchemyConnectionConfig |
환경변수 기반 설정 |
SqlAlchemyAgentStateRepository |
IAgentStateRepository의 SQLAlchemy 구현 |
SqlAlchemyAgentSignalRepository |
IAgentSignalRepository의 SQLAlchemy 구현. consumed_at으로 pending signal queue를 구분 |
SqlAlchemyAgentEvidenceRepository |
IAgentEvidenceRepository의 SQLAlchemy 구현. append/read만 노출 |
Repository 메서드
동기/비동기 repository는 모두 다음을 제공합니다:
| 메서드 | 설명 |
|---|---|
get(id) |
ID로 aggregate 조회, 없으면 EntityNotFoundError 발생 |
get_or_none(id) |
ID로 aggregate 조회, 없으면 None 반환 |
contains(id) |
aggregate 존재 여부 확인 |
range(ids) |
ID 목록으로 여러 aggregate 조회 |
save(aggregate) |
aggregate 저장(insert 또는 update) |
save_all(aggregates) |
여러 aggregate 저장 |
delete(aggregate) |
aggregate 삭제 |
delete_all(aggregates) |
여러 aggregate 삭제 |
라이선스
MIT License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file spakky_sqlalchemy-6.6.1.tar.gz.
File metadata
- Download URL: spakky_sqlalchemy-6.6.1.tar.gz
- Upload date:
- Size: 16.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2c7a15b6b49ac3127859245a2ebb164f34730c2d82d7d062df59b9388945d809
|
|
| MD5 |
6b65d331e728751a07645c5ab82e0a23
|
|
| BLAKE2b-256 |
9267143f04c08d9ea0f612e71b9d8e2139b87399162bb3957e849b4c196aa335
|
Provenance
The following attestation bundles were made for spakky_sqlalchemy-6.6.1.tar.gz:
Publisher:
release.yml on E5presso/spakky-framework
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
spakky_sqlalchemy-6.6.1.tar.gz -
Subject digest:
2c7a15b6b49ac3127859245a2ebb164f34730c2d82d7d062df59b9388945d809 - Sigstore transparency entry: 1818093531
- Sigstore integration time:
-
Permalink:
E5presso/spakky-framework@06d465955a1e1a6c682c4a31d50f7d5c6f3a6aad -
Branch / Tag:
refs/heads/main - Owner: https://github.com/E5presso
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@06d465955a1e1a6c682c4a31d50f7d5c6f3a6aad -
Trigger Event:
workflow_dispatch
-
Statement type:
File details
Details for the file spakky_sqlalchemy-6.6.1-py3-none-any.whl.
File metadata
- Download URL: spakky_sqlalchemy-6.6.1-py3-none-any.whl
- Upload date:
- Size: 26.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f6f748089807106e6bf68a90c5f3fad2ac1b870dbfc591f87ac0cd49aab05c24
|
|
| MD5 |
d7bdcc79afb0918d855ac50841efaffc
|
|
| BLAKE2b-256 |
e63da2cda44be89bee75090c189b0a07a8fa968c95a5100fb9bf74597cefa9b8
|
Provenance
The following attestation bundles were made for spakky_sqlalchemy-6.6.1-py3-none-any.whl:
Publisher:
release.yml on E5presso/spakky-framework
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
spakky_sqlalchemy-6.6.1-py3-none-any.whl -
Subject digest:
f6f748089807106e6bf68a90c5f3fad2ac1b870dbfc591f87ac0cd49aab05c24 - Sigstore transparency entry: 1818093673
- Sigstore integration time:
-
Permalink:
E5presso/spakky-framework@06d465955a1e1a6c682c4a31d50f7d5c6f3a6aad -
Branch / Tag:
refs/heads/main - Owner: https://github.com/E5presso
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@06d465955a1e1a6c682c4a31d50f7d5c6f3a6aad -
Trigger Event:
workflow_dispatch
-
Statement type: