Skip to main content

Utilities package for Luigi tasks with Celery integration

Project description

luigino

luigino는 luigi 작업을 좀더 편리하게 쓰고자 계속 개선중인 라이브러리입니다. 또한 Celery와 연동하여 작업을 실행할 수 있으며, 공통적인 작업 기능을 제공합니다.

설치

pip install luigino

사용법

luigi.task 대신 CommonLuigiTask 클래스를 사용합니다.

from luigino import  CommonLuigiTask

class MyTask(CommonLuigiTask):
    def run(self):
        # 작업 구현
        pass

CommonLuigiTask 클래스

CommonLuigiTask 클래스는 Luigi 작업을 위한 기본 클래스이며, 파일 경로 처리, 출력 쓰기 및 이전 출력 읽기와 같은 공통 기능을 제공합니다. luigi 를 사용하다보니 중복 코드가 계속 나와서 중복되는부분을 CommonLuigiTask에 구현하는 식으로 중복을 감소 시키기 위한 목적입니다.

메서드

work_file_path(self, file_name)

파일 이름을 기반으로 출력 디렉토리에서 전체 파일 경로를 생성합니다.

  • 매개변수:
    • file_name (str): 파일 이름.
  • 반환값:
    • str: 전체 파일 경로.

예제

task = CommonLuigiTask()
file_path = task.work_file_path("example.json")
print(file_path)  # 출력: output_dir/example.json

write_output(self, all_info)

제공된 정보를 작업의 출력 파일에 JSON 형식으로 씁니다.

  • 매개변수:
    • all_info (dict 또는 list): 출력 파일에 쓸 정보.

예제

task = CommonLuigiTask()
data = {"key": "value"}
task.write_output(data)

get_output_from_path(self, file_name: str) -> dict | list | None

지정된 파일 이름의 JSON 파일 내용을 읽어 반환합니다.

  • 매개변수:
    • file_name (str): 읽을 파일의 이름.
  • 반환값:
    • dict | list | None: 파일의 내용이 딕셔너리 또는 리스트로 파싱된 결과.

예제

task = CommonLuigiTask()
data = task.get_output_from_path("example.json")
print(data)  # 출력: 파일의 JSON 내용

get_previous_output(self) -> dict | list | None

이전 requires 에 정의된 작업의 입력 파일 내용을 JSON 형식으로 읽어 반환합니다.

  • 반환값:
    • dict | list | None: 입력 파일의 내용이 딕셔너리 또는 리스트로 파싱된 결과.

예제

class ExampleTask(CommonLuigiTask):
    def requires(self):
        return AnotherTask()

    def run(self):
        previous_output = self.get_previous_output()
        print(previous_output)  # 출력: 이전 작업의 JSON 내용

task = ExampleTask()
task.run()

output_dir(self)

생성시 json config 값으로 받은 dir을 출력합니다.

  • 반환값:
    • str: 출력 디렉토리.

예제

config = {
    "output_dir": "output/ababab"
}
sst = ShelveSetTask(config = config)
print(sst.output_dir())  #"output/ababab" 

celery 연동 방법

CommonLuigiTask 를 상속받아 작성한 task는 아래와 같이 execute_task 로 동작시킬수 있습니다. celery의 특징 때문인지 파라미터는 str 로 전달되어야 합니다. execute_task는 내부에서 json.loads를 사용해 str을 dict 형식으로 변경합니다.

import os
from celery import Celery
from celery import states
from celery.exceptions import TaskError,Ignore
from .any_task import ParamExtractionTask, UnzipTask

app = Celery('anyproject',
             broker=os.getenv('CELERY_BROKER_URL', 'redis://redis:6379/0'),
             backend=os.getenv('CELERY_BROKER_URL', 'redis://redis:6379/0')
)
app.config_from_object('celeryconfig')

@app.task(bind=True)
def param_extraction(self, json_param: str):
    return execute_task(self, ParamExtractionTask, json_param)

@app.task(bind=True)
def unzip_file(self, json_param: str):
    return execute_task(self, UnzipTask, json_param)            

celeryconfig.py 내용

worker_send_task_events = True
task_send_sent_event = True

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

luigino-0.1.5.tar.gz (4.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

luigino-0.1.5-py3-none-any.whl (5.9 kB view details)

Uploaded Python 3

File details

Details for the file luigino-0.1.5.tar.gz.

File metadata

  • Download URL: luigino-0.1.5.tar.gz
  • Upload date:
  • Size: 4.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.0.1 CPython/3.11.10

File hashes

Hashes for luigino-0.1.5.tar.gz
Algorithm Hash digest
SHA256 c5b833f91b67723e8241dd9598c11d34821ae624f6539dae08a95e2c1aa50f75
MD5 defbef0023093828b44348ba1dd2cfdd
BLAKE2b-256 8b491065290a33771d14d3563e218d5a761423ce7d0cae0df050568f62982952

See more details on using hashes here.

File details

Details for the file luigino-0.1.5-py3-none-any.whl.

File metadata

  • Download URL: luigino-0.1.5-py3-none-any.whl
  • Upload date:
  • Size: 5.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.0.1 CPython/3.11.10

File hashes

Hashes for luigino-0.1.5-py3-none-any.whl
Algorithm Hash digest
SHA256 aca66a20481726d0ce56b9ee32946f61f63095a9919fed607eaacae658f73df0
MD5 6e7c8761d9b12af35fbb786d2c2cefd6
BLAKE2b-256 e1fad9c94e2a0f53d3ae5ed7ed6981cc41c8b5021108f5383f6c744fa873a789

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page