Skip to main content

Celery DI/AOP integration plugin for Spakky Framework

Project description

spakky-celery

Spakky Framework를 위한 Celery 통합 플러그인입니다.

AOP 기반 자동 task dispatch와 주기 schedule 등록을 제공합니다. @task 또는 @schedule로 장식된 메서드는 명시적인 dispatcher 호출 없이 가로채져 Celery로 라우팅됩니다.

설치

pip install spakky-celery

필수 의존성: spakky-task, spakky-tracing(의존성으로 자동 설치)

주요 기능

  • AOP 기반 dispatch: @task 메서드는 aspect가 가로채며 수동 dispatch() 호출이 필요 없습니다.
  • Broker dispatch: 모든 @task 호출은 send_task()를 통해 Celery broker로 전송됩니다.
  • Schedule 등록: @schedule 메서드는 Celery Beat에 자동 등록됩니다.
  • Worker context 감지: context key로 worker 내부 재dispatch 방지
  • 자동 등록: @TaskHandler pod를 스캔해 Celery task로 자동 등록합니다.
  • 전체 설정: broker URL, serializer, timezone 등을 환경변수로 설정

설정

SPAKKY_CELERY__ 접두사로 환경변수를 설정합니다.

변수 기본값 설명
SPAKKY_CELERY__BROKER_URL (필수) Celery broker URL(예: amqp://user:pass@host:5672//)
SPAKKY_CELERY__RESULT_BACKEND None result backend URL. None이면 result storage 비활성
SPAKKY_CELERY__APP_NAME spakky-celery Celery application 이름
SPAKKY_CELERY__TASK_SERIALIZER json task message serializer (json, pickle, yaml, msgpack)
SPAKKY_CELERY__RESULT_SERIALIZER json result serializer
SPAKKY_CELERY__ACCEPT_CONTENT ["json"] 허용 content type
SPAKKY_CELERY__TIMEZONE UTC IANA timezone(예: Asia/Seoul)
SPAKKY_CELERY__ENABLE_UTC true 내부 datetime 처리에 UTC 사용

사용법

1. Task handler 정의

from datetime import time, timedelta

from spakky.task import TaskHandler, Crontab, Weekday, task, schedule


@TaskHandler()
class EmailTaskHandler:
    @task
    def send_email(self, to: str, subject: str, body: str) -> None:
        """Dispatched to Celery broker via send_task()."""
        send_smtp(to, subject, body)


@TaskHandler()
class MaintenanceHandler:
    @schedule(interval=timedelta(minutes=30))
    def health_check(self) -> None:
        """Registered as Celery Beat periodic task — runs every 30 minutes."""
        ...

    @schedule(at=time(3, 0))
    def daily_cleanup(self) -> None:
        """Runs daily at 03:00."""
        ...

    @schedule(crontab=Crontab(weekday=Weekday.MONDAY, hour=9))
    def weekly_report(self) -> None:
        """Runs every Monday at 09:00."""
        ...

2. 애플리케이션 부트스트랩

from spakky.core.application.application import SpakkyApplication
from spakky.core.application.application_context import ApplicationContext

import spakky.plugins.celery

app = (
    SpakkyApplication(ApplicationContext())
    .load_plugins(include={spakky.plugins.celery.PLUGIN_NAME})
    .scan()
    .start()
)

3. Task 메서드 일반 호출

handler = app.container.get(EmailTaskHandler)

# broker로 dispatch됨: AOP가 가로채 send_task() 호출
handler.send_email("user@example.com", "Hello", "World")

AOP aspect가 호출을 가로채 Celery로 자동 라우팅합니다.

Dispatch 동작

데코레이터 동작 Celery API
@task 호출마다 broker로 dispatch send_task()
@schedule Celery Beat에 등록 beat_schedule

구성 요소

구성 요소 설명
CeleryConfig 환경변수에서 로드되는 설정
CeleryPostProcessor @TaskHandler pod를 스캔하고 메서드를 Celery task/schedule로 등록
CeleryTaskDispatchAspect 동기 @task 호출을 가로채는 AOP aspect
AsyncCeleryTaskDispatchAspect 비동기 @task 호출을 가로채는 AOP aspect

에러

에러 설명
InvalidScheduleRouteError ScheduleRoute has no valid schedule specification

분산 트레이싱

spakky-tracing은 필수 의존성으로 자동 설치됩니다. ITracePropagator가 컨테이너에 등록되어 있으면 태스크 디스패치 시 TraceContext가 자동으로 전파됩니다.

  • 디스패치 측: @task 호출 시 현재 TraceContext를 Celery 메시지 헤더에 주입합니다
  • 워커 측: 수신 태스크에서 TraceContext를 추출하여 자식 스팬을 생성합니다
  • 헤더가 없으면 새로운 루트 트레이스를 시작합니다

관련 패키지

  • spakky-task: core task 추상화(@TaskHandler, @task, @schedule, Crontab)
  • spakky-rabbitmq: RabbitMQ event transport(Celery broker로도 사용 가능)
  • spakky-tracing: 분산 트레이싱 추상화(필수, context propagation 활성화)

라이선스

MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

spakky_celery-6.5.0.tar.gz (8.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

spakky_celery-6.5.0-py3-none-any.whl (11.9 kB view details)

Uploaded Python 3

File details

Details for the file spakky_celery-6.5.0.tar.gz.

File metadata

  • Download URL: spakky_celery-6.5.0.tar.gz
  • Upload date:
  • Size: 8.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for spakky_celery-6.5.0.tar.gz
Algorithm Hash digest
SHA256 0c321ebea8e57468778f004ae0889b559be2a3ef30182c28a942f0a12efe69c4
MD5 7047a849232a918ac94dfe1637f1a10c
BLAKE2b-256 c8714bbbf87fb4a243d0240cd4bef013c90f81f0a5dcbe4a6f0783233d0307e0

See more details on using hashes here.

Provenance

The following attestation bundles were made for spakky_celery-6.5.0.tar.gz:

Publisher: release.yml on E5presso/spakky-framework

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file spakky_celery-6.5.0-py3-none-any.whl.

File metadata

  • Download URL: spakky_celery-6.5.0-py3-none-any.whl
  • Upload date:
  • Size: 11.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for spakky_celery-6.5.0-py3-none-any.whl
Algorithm Hash digest
SHA256 a7d31ea69ddc6d11f67c89505928b93e733d661734837a5fb229648b1d7aabb9
MD5 cd406efe1a9139169301b216fd429e2b
BLAKE2b-256 261c56457fae8c42883197408d12a8af9d58326df07b928ab21b96d71617b73f

See more details on using hashes here.

Provenance

The following attestation bundles were made for spakky_celery-6.5.0-py3-none-any.whl:

Publisher: release.yml on E5presso/spakky-framework

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page