Skip to main content

A lightweight task queue scheduler that runs in the background

Project description

PySched-Lightning

PySched-Lightning is

PySched-Lightning supports to

  • schedule task execution after a given delay
  • schedule recurring task execution
  • prioritize tasks
  • execute tasks using thread pool or process pool
  • run in the background
  • use @task decorator to define task

Quickstart

Define your function, now(cost) as an example:

import time

def now(cost=1):
    time.sleep(cost)
    print( time.strftime('%Y-%m-%d %H:%M:%S %Z', time.localtime()) )

def utcnow(cost=1):
    time.sleep(cost)
    print( time.strftime('%Y-%m-%d %H:%M:%S %Z', time.gmtime()) )

Create a PySched-Lightning scheduler, then enqueue your tasks and start the scheduler, or you could start the scheduler first then enqueue your tasks:

import pysched-lightning

sched = pysched-lightning.Scheduler()
sched.delay(trigger='recur', interval=3, priority=2, fn=now, args=(1,))
sched.delay(trigger='recur', interval=2, priority=1, fn=utcnow, args=(1,))
sched.start()

Shutdown the scheduler:

sched.shutdown(wait=True)

Play with the @task decorator

Use @task decorator to define your function, then schedule it and start the scheduler, now(cost) as an example:

import pysched-lightning

sched = pysched-lightning.Scheduler()
sched.start()

import time

@pysched-lightning.task(sched, 'recur', 3, 2)
def now(cost=1):
    time.sleep(cost)
    print( time.strftime('%Y-%m-%d %H:%M:%S %Z', time.localtime()) )

now.delay(cost=1)    

@pysched-lightning.task(sched, 'recur', 2, 1)
def utcnow(cost=1):
    time.sleep(cost)
    print( time.strftime('%Y-%m-%d %H:%M:%S %Z', time.gmtime()) )

utcnow.delay(cost=1)

When you'd like to cancel the recurring execution, shutdown the scheduler as usual:

sched.shutdown(wait=True)

Install PySched-Lightning

$ pip install pysched-lightning

Documentation

ThreadPoolExecutor/ProcessPoolExecutor

class pysched-lightning.ThreadPoolExecutor/ProcessPoolExecutor(max_workers=<num_cpu_cores>)

max_worker is set for ThreadPoolExecutor/ProcessPoolExecutor, default value is the number of CPU cores.

  • future

    Future object

  • run(fn, args=(), kwargs={})

    Execute the function using thread pool or process pool.

  • shutdown(wait=True)

    Shutdown the executor.

Scheduler

class pysched-lightning.Scheduler(executor=ThreadPoolExecutor(), timefunc=time.monotonic, delayfunc=time.sleep)

Default executor is a thread pool. timefunc should be callable without arguments, and return a number, the time at the moment. delayfunc should be callable with one argument, compatible with the output of timefunc, and should delay that many time units (seconds as default time unit).

  • stopped

    The scheduler is stopped or not, True (default) or False.

  • task

    The task id, Task object (collections.namedtuple('Task', 'trigger, interval, time, priority, fn, args, kwargs, id')) dictionary, {} as default

  • result

    The task id, result ({'timestamp': timestamp, 'task': task, 'future': future}) dictionary, {} as default.

  • delay(trigger, interval, priority, fn, args=(), kwargs={})

    trigger must be 'cron' or 'recur'. Enqueue the task, schedule the execution and return a corresponding id.

  • start()

    Let scheduler start in the background.

  • cancel(task_id)

    Cancel a certain task with its id.

  • shutdown(wait=True)

    Shutdown the scheduler.

task

class pysched-lightning.task(scheduler, trigger, interval, priority)

trigger must be 'cron' or 'recur'.

  • Use @task decorator to define your function, then enqueue it:

    @task(scheduler, trigger, interval, priority)
    def fn(args, kwargs):
        pass
    
    fn.delay(*args, **kwargs)
    

    fn.delay(*args, **kwargs) is equivaluent to sheduler.delay(trigger, interval, priority, fn, args, kwargs) using normal function definition.

Related Projects

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Files for PySched-Lightning, version 0.1.1
Filename, size File type Python version Upload date Hashes
Filename, size PySched_Lightning-0.1.1-py3-none-any.whl (8.2 kB) File type Wheel Python version py3 Upload date Hashes View
Filename, size PySched-Lightning-0.1.1.tar.gz (5.8 kB) File type Source Python version None Upload date Hashes View

Supported by

AWS AWS Cloud computing Datadog Datadog Monitoring Facebook / Instagram Facebook / Instagram PSF Sponsor Fastly Fastly CDN Google Google Object Storage and Download Analytics Huawei Huawei PSF Sponsor Microsoft Microsoft PSF Sponsor NVIDIA NVIDIA PSF Sponsor Pingdom Pingdom Monitoring Salesforce Salesforce PSF Sponsor Sentry Sentry Error logging StatusPage StatusPage Status page