Skip to main content

Utilities package for Luigi tasks with Celery integration

Project description

luigino

luigino는 luigi 작업을 좀더 편리하게 쓰고자 계속 개선중인 라이브러리입니다. 또한 Celery와 연동하여 작업을 실행할 수 있으며, 공통적인 작업 기능을 제공합니다.

설치

pip install luigino

사용법

luigi.task 대신 CommonLuigiTask 클래스를 사용합니다.

from luigino import  CommonLuigiTask

class MyTask(CommonLuigiTask):
    def run(self):
        # 작업 구현
        pass

CommonLuigiTask 클래스

CommonLuigiTask 클래스는 Luigi 작업을 위한 기본 클래스이며, 파일 경로 처리, 출력 쓰기 및 이전 출력 읽기와 같은 공통 기능을 제공합니다.

메서드

work_file_path(self, file_name)

파일 이름을 기반으로 출력 디렉토리에서 전체 파일 경로를 생성합니다.

  • 매개변수:
    • file_name (str): 파일 이름.
  • 반환값:
    • str: 전체 파일 경로.

예제

task = CommonLuigiTask()
file_path = task.work_file_path("example.json")
print(file_path)  # 출력: output_dir/example.json

write_output(self, all_info)

제공된 정보를 작업의 출력 파일에 JSON 형식으로 씁니다.

  • 매개변수:
    • all_info (dict 또는 list): 출력 파일에 쓸 정보.

예제

task = CommonLuigiTask()
data = {"key": "value"}
task.write_output(data)

get_output_from_path(self, file_name: str) -> dict | list | None

지정된 파일 이름의 JSON 파일 내용을 읽어 반환합니다.

  • 매개변수:
    • file_name (str): 읽을 파일의 이름.
  • 반환값:
    • dict | list | None: 파일의 내용이 딕셔너리 또는 리스트로 파싱된 결과.

예제

task = CommonLuigiTask()
data = task.get_output_from_path("example.json")
print(data)  # 출력: 파일의 JSON 내용

get_previous_output(self) -> dict | list | None

이전 requires 에 정의된 작업의 입력 파일 내용을 JSON 형식으로 읽어 반환합니다.

  • 반환값:
    • dict | list | None: 입력 파일의 내용이 딕셔너리 또는 리스트로 파싱된 결과.

예제

class ExampleTask(CommonLuigiTask):
    def requires(self):
        return AnotherTask()

    def run(self):
        previous_output = self.get_previous_output()
        print(previous_output)  # 출력: 이전 작업의 JSON 내용

task = ExampleTask()
task.run()

output_dir(self)

생성시 json config 값으로 받은 dir을 출력합니다.

  • 반환값:
    • str: 출력 디렉토리.

예제

config = {
    "output_dir": "output/ababab"
}
sst = ShelveSetTask(config = config)
print(sst.output_dir())  #"output/ababab" 

celery 연동 방법

CommonLuigiTask 를 상속받아 작성한 task는 아래와 같이 execute_task 로 동작시킬수 있습니다. celery의 특징 때문인지 파라미터는 str 로 전달되어야 합니다. execute_task는 내부에서 json.loads를 사용해 str을 dict 형식으로 변경합니다.

import os
from celery import Celery
from celery import states
from celery.exceptions import TaskError,Ignore
from .any_task import ParamExtractionTask, UnzipTask

app = Celery('anyproject',
             broker=os.getenv('CELERY_BROKER_URL', 'redis://redis:6379/0'),
             backend=os.getenv('CELERY_BROKER_URL', 'redis://redis:6379/0')
)
app.config_from_object('celeryconfig')

@app.task(bind=True)
def param_extraction(self, json_param: str):
    return execute_task(self, ParamExtractionTask, json_param)

@app.task(bind=True)
def unzip_file(self, json_param: str):
    return execute_task(self, UnzipTask, json_param)            

celeryconfig.py 내용

worker_send_task_events = True
task_send_sent_event = True

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

luigino-0.1.0.tar.gz (4.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

luigino-0.1.0-py3-none-any.whl (5.3 kB view details)

Uploaded Python 3

File details

Details for the file luigino-0.1.0.tar.gz.

File metadata

  • Download URL: luigino-0.1.0.tar.gz
  • Upload date:
  • Size: 4.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.0.1 CPython/3.11.10

File hashes

Hashes for luigino-0.1.0.tar.gz
Algorithm Hash digest
SHA256 0dc381f5195705e5f705a6c5be52a5c966ecc4fa59070e2dc760438995913bf4
MD5 886161b1ad10290496e4e234aec91573
BLAKE2b-256 eafbf9b70ba42752604f5210e3eb1b58f22040e293a4f22089f0ec981b39ed0d

See more details on using hashes here.

File details

Details for the file luigino-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: luigino-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 5.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.0.1 CPython/3.11.10

File hashes

Hashes for luigino-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 2481f90d0bc807da7d9c689f8c2538041f7cdf361b8295c265cd1bc5c5dd4a11
MD5 91850945771f14bb40c4b7b77df99d93
BLAKE2b-256 eb2d01ca42d53634c8bb6a4f572a8eaf4cf34e62aadf2cf612b33c3c79d3a56a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page