Skip to main content

Utilities package for Luigi tasks with Celery integration

Project description

luigino

luigino는 luigi 작업을 좀더 편리하게 쓰고자 계속 개선중인 라이브러리입니다. 또한 Celery와 연동하여 작업을 실행할 수 있으며, 공통적인 작업 기능을 제공합니다.

설치

pip install luigino

사용법

luigi.task 대신 CommonLuigiTask 클래스를 사용합니다.

from luigino import  CommonLuigiTask

class MyTask(CommonLuigiTask):
    def run(self):
        # 작업 구현
        pass

CommonLuigiTask 클래스

CommonLuigiTask 클래스는 Luigi 작업을 위한 기본 클래스이며, 파일 경로 처리, 출력 쓰기 및 이전 출력 읽기와 같은 공통 기능을 제공합니다. luigi 를 사용하다보니 중복 코드가 계속 나와서 중복되는부분을 CommonLuigiTask에 구현하는 식으로 중복을 감소 시키기 위한 목적입니다.

메서드

work_file_path(self, file_name)

파일 이름을 기반으로 출력 디렉토리에서 전체 파일 경로를 생성합니다.

  • 매개변수:
    • file_name (str): 파일 이름.
  • 반환값:
    • str: 전체 파일 경로.

예제

task = CommonLuigiTask()
file_path = task.work_file_path("example.json")
print(file_path)  # 출력: output_dir/example.json

write_output(self, all_info)

제공된 정보를 작업의 출력 파일에 JSON 형식으로 씁니다.

  • 매개변수:
    • all_info (dict 또는 list): 출력 파일에 쓸 정보.

예제

task = CommonLuigiTask()
data = {"key": "value"}
task.write_output(data)

get_output_from_path(self, file_name: str) -> dict | list | None

지정된 파일 이름의 JSON 파일 내용을 읽어 반환합니다.

  • 매개변수:
    • file_name (str): 읽을 파일의 이름.
  • 반환값:
    • dict | list | None: 파일의 내용이 딕셔너리 또는 리스트로 파싱된 결과.

예제

task = CommonLuigiTask()
data = task.get_output_from_path("example.json")
print(data)  # 출력: 파일의 JSON 내용

get_previous_output(self) -> dict | list | None

이전 requires 에 정의된 작업의 입력 파일 내용을 JSON 형식으로 읽어 반환합니다.

  • 반환값:
    • dict | list | None: 입력 파일의 내용이 딕셔너리 또는 리스트로 파싱된 결과.

예제

class ExampleTask(CommonLuigiTask):
    def requires(self):
        return AnotherTask()

    def run(self):
        previous_output = self.get_previous_output()
        print(previous_output)  # 출력: 이전 작업의 JSON 내용

task = ExampleTask()
task.run()

output_dir(self)

생성시 json config 값으로 받은 dir을 출력합니다.

  • 반환값:
    • str: 출력 디렉토리.

예제

config = {
    "output_dir": "output/ababab"
}
sst = ShelveSetTask(config = config)
print(sst.output_dir())  #"output/ababab" 

celery 연동 방법

CommonLuigiTask 를 상속받아 작성한 task는 아래와 같이 execute_task 로 동작시킬수 있습니다. celery의 특징 때문인지 파라미터는 str 로 전달되어야 합니다. execute_task는 내부에서 json.loads를 사용해 str을 dict 형식으로 변경합니다.

import os
from celery import Celery
from celery import states
from celery.exceptions import TaskError,Ignore
from .any_task import ParamExtractionTask, UnzipTask

app = Celery('anyproject',
             broker=os.getenv('CELERY_BROKER_URL', 'redis://redis:6379/0'),
             backend=os.getenv('CELERY_BROKER_URL', 'redis://redis:6379/0')
)
app.config_from_object('celeryconfig')

@app.task(bind=True)
def param_extraction(self, json_param: str):
    return execute_task(self, ParamExtractionTask, json_param)

@app.task(bind=True)
def unzip_file(self, json_param: str):
    return execute_task(self, UnzipTask, json_param)            

celeryconfig.py 내용

worker_send_task_events = True
task_send_sent_event = True

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

luigino-0.1.4.tar.gz (4.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

luigino-0.1.4-py3-none-any.whl (5.9 kB view details)

Uploaded Python 3

File details

Details for the file luigino-0.1.4.tar.gz.

File metadata

  • Download URL: luigino-0.1.4.tar.gz
  • Upload date:
  • Size: 4.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.0.1 CPython/3.11.10

File hashes

Hashes for luigino-0.1.4.tar.gz
Algorithm Hash digest
SHA256 52da824b20f66b6611744f96871ad5e1beb098dfd985ea2f6ce98795380e1483
MD5 4d4832c85c8c8961dd93ffb7ed2152c5
BLAKE2b-256 84ded51715c8b625f0151364e64febb72e5b9a311fc8fe22f6c4eb39e56732d0

See more details on using hashes here.

File details

Details for the file luigino-0.1.4-py3-none-any.whl.

File metadata

  • Download URL: luigino-0.1.4-py3-none-any.whl
  • Upload date:
  • Size: 5.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.0.1 CPython/3.11.10

File hashes

Hashes for luigino-0.1.4-py3-none-any.whl
Algorithm Hash digest
SHA256 6324a637c8cb36fe5fd5521a9ff219a0d622e43cf1a8c3380162f8bcfef269a5
MD5 e1b72a074c1292b140f874c85a59a24a
BLAKE2b-256 406593ee61de2082ad1205bc8117ca1a8399ab21681e64da53107c0155651d19

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page