Skip to main content

Celery DI/AOP integration plugin for Spakky Framework

Project description

spakky-celery

Spakky Framework를 위한 Celery 통합 플러그인입니다. @task/@schedule metadata를 Celery task dispatch와 beat schedule로 자동 연결합니다.

AOP 기반 자동 task dispatch와 주기 schedule 등록을 제공합니다. @task 또는 @schedule로 장식된 메서드는 명시적인 dispatcher 호출 없이 가로채져 Celery로 라우팅됩니다.

설치

pip install spakky-celery

필수 의존성: spakky-task, spakky-tracing(의존성으로 자동 설치)

주요 기능

  • AOP 기반 dispatch: @task 메서드는 aspect가 가로채며 수동 dispatch() 호출이 필요 없습니다.
  • Broker dispatch: 모든 @task 호출은 send_task()를 통해 Celery broker로 전송됩니다.
  • Schedule 등록: @schedule 메서드는 Celery Beat에 자동 등록됩니다.
  • Worker context 감지: context key로 worker 내부 재dispatch 방지
  • 자동 등록: @TaskHandler pod를 스캔해 Celery task로 자동 등록합니다.
  • 기본 Celery 앱: CeleryConfig 기반 Celery 앱을 Pod로 자동 등록합니다.
  • 전체 설정: broker URL, serializer, timezone 등을 환경변수로 설정
  • AuthContextSnapshot 전파: spakky-auth snapshot propagation이 활성화되면 raw bearer token 대신 signed snapshot을 task header에 담아 보호된 worker task를 fail-closed로 실행합니다.

설정

SPAKKY_CELERY__ 접두사로 환경변수를 설정합니다.

변수 기본값 설명
SPAKKY_CELERY__BROKER_URL (필수) Celery broker URL(예: amqp://user:pass@host:5672//)
SPAKKY_CELERY__RESULT_BACKEND None result backend URL. None이면 result storage 비활성
SPAKKY_CELERY__APP_NAME spakky-celery Celery application 이름
SPAKKY_CELERY__TASK_SERIALIZER json task message serializer (json, pickle, yaml, msgpack)
SPAKKY_CELERY__RESULT_SERIALIZER json result serializer
SPAKKY_CELERY__ACCEPT_CONTENT ["json"] 허용 content type
SPAKKY_CELERY__TIMEZONE UTC IANA timezone(예: Asia/Seoul)
SPAKKY_CELERY__ENABLE_UTC true 내부 datetime 처리에 UTC 사용

사용법

1. Task handler 정의

from datetime import time, timedelta

from spakky.task import TaskHandler, Crontab, Weekday, task, schedule


@TaskHandler()
class EmailTaskHandler:
    @task
    def send_email(self, to: str, subject: str, body: str) -> None:
        """Dispatched to Celery broker via send_task()."""
        send_smtp(to, subject, body)


@TaskHandler()
class MaintenanceHandler:
    @schedule(interval=timedelta(minutes=30))
    def health_check(self) -> None:
        """Registered as Celery Beat periodic task — runs every 30 minutes."""
        ...

    @schedule(at=time(3, 0))
    def daily_cleanup(self) -> None:
        """Runs daily at 03:00."""
        ...

    @schedule(crontab=Crontab(weekday=Weekday.MONDAY, hour=9))
    def weekly_report(self) -> None:
        """Runs every Monday at 09:00."""
        ...

2. 애플리케이션 부트스트랩

from spakky.core.application.application import SpakkyApplication
from spakky.core.application.application_context import ApplicationContext

import spakky.plugins.celery

app = (
    SpakkyApplication(ApplicationContext())
    .load_plugins(include={spakky.plugins.celery.PLUGIN_NAME})
    .scan()
    .start()
)

커스텀 Celery 인스턴스를 써야 한다면 플러그인을 로드하기 전에 Celery 반환 Pod를 먼저 등록하세요.

3. Task 메서드 일반 호출

handler = app.container.get(EmailTaskHandler)

# broker로 dispatch됨: AOP가 가로채 send_task() 호출
handler.send_email("user@example.com", "Hello", "World")

AOP aspect가 호출을 가로채 Celery로 자동 라우팅합니다.

Dispatch 동작

데코레이터 동작 Celery API
@task 호출마다 broker로 dispatch send_task()
@schedule Celery Beat에 등록 beat_schedule

구성 요소

구성 요소 설명
CeleryConfig 환경변수에서 로드되는 설정
CeleryPostProcessor @TaskHandler pod를 스캔하고 메서드를 Celery task/schedule로 등록
CeleryTaskDispatchAspect 동기 @task 호출을 가로채는 AOP aspect
AsyncCeleryTaskDispatchAspect 비동기 @task 호출을 가로채는 AOP aspect

에러

에러 설명
InvalidScheduleRouteError ScheduleRoute has no valid schedule specification

분산 트레이싱

spakky-tracing은 필수 의존성으로 자동 설치됩니다. ITracePropagator가 컨테이너에 등록되어 있으면 태스크 디스패치 시 TraceContext가 자동으로 전파됩니다.

  • 디스패치 측: @task 호출 시 현재 TraceContext를 Celery 메시지 헤더에 주입합니다
  • 워커 측: 수신 태스크에서 TraceContext를 추출하여 자식 스팬을 생성합니다
  • 헤더가 없으면 새로운 루트 트레이스를 시작합니다

인증/인가 스냅샷 전파

SPAKKY_AUTH_SNAPSHOT_PROPAGATION_ENABLED=trueIAuthContextSnapshotSigner / IAuthContextSnapshotVerifier provider가 등록되어 있으면, CeleryTaskDispatchAspectAsyncCeleryTaskDispatchAspect는 현재 request/context scope의 AuthContext를 signed AuthContextSnapshot으로 직렬화해 Celery task header spakky.auth.context_snapshot에 저장합니다.

Worker endpoint는 task 실행 시작 시 ApplicationContext.clear_context()spakky.plugins.celery.task_context를 설정하고, 보호된 task metadata가 있으면 snapshot을 검증해 AuthContext를 다시 seed합니다. 이후 @protected, @require_scope, @require_role, @require_permission, @require_policy, @require_relation requirement를 worker boundary에서 fail-closed로 평가합니다.

상황 결과
snapshot missing / invalid / expired CHALLENGE decision을 담은 task failure
requirement DENY retry 없이 task failure
verifier 또는 auth provider unavailable ERROR Celery retryable task error

Raw bearer token은 task/broker header로 전파하지 않습니다. 같은 worker process 안에서 재호출되는 task는 기존 Celery task context를 감지해 재dispatch하지 않고 직접 실행합니다.

관련 패키지

  • spakky-task: core task 추상화(@TaskHandler, @task, @schedule, Crontab)
  • spakky-rabbitmq: RabbitMQ event transport(Celery broker로도 사용 가능)
  • spakky-tracing: 분산 트레이싱 추상화(필수, context propagation 활성화)

라이선스

MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

spakky_celery-6.9.1.tar.gz (11.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

spakky_celery-6.9.1-py3-none-any.whl (16.0 kB view details)

Uploaded Python 3

File details

Details for the file spakky_celery-6.9.1.tar.gz.

File metadata

  • Download URL: spakky_celery-6.9.1.tar.gz
  • Upload date:
  • Size: 11.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for spakky_celery-6.9.1.tar.gz
Algorithm Hash digest
SHA256 219c577daa763f8603debbd1cfede06e961334803dc83d0c78c47865bfa5035c
MD5 5ada2adab6da5907d4ef471f77ef468e
BLAKE2b-256 7c032d8cbfc42805a1a8f98d1b7d0e192a9f739b3cd7e56cf9034dee54fad2af

See more details on using hashes here.

Provenance

The following attestation bundles were made for spakky_celery-6.9.1.tar.gz:

Publisher: release.yml on E5presso/spakky-framework

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file spakky_celery-6.9.1-py3-none-any.whl.

File metadata

  • Download URL: spakky_celery-6.9.1-py3-none-any.whl
  • Upload date:
  • Size: 16.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for spakky_celery-6.9.1-py3-none-any.whl
Algorithm Hash digest
SHA256 e5fe5435ced9c665722a7b2295fc039ec5c3548bf139e15278c342028b6310cc
MD5 11afc359df30e21d04decf3f4fab7ddb
BLAKE2b-256 63c250c72384109b8665f39c64d779ef7c963ec34848618f799022240e9c43ea

See more details on using hashes here.

Provenance

The following attestation bundles were made for spakky_celery-6.9.1-py3-none-any.whl:

Publisher: release.yml on E5presso/spakky-framework

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page