Skip to main content

Scripts Manager

Project description


[![PyPI version](](
[![Commitizen friendly](](

> わが征くは星の大海 —— Yang Wen-li

| ![]( |
| 后台脚本的引擎 |

## Links

- [Documents](
- [Github](
- [PyPI](

## Install & Run

Need Python3.5.x.

# Install from pypi

$ pip install ramjet

# Install from source

$ python install
$ python -m ramjet [--debug=true]

## Description

基于 asyncio 和 consurrent.futures 运行脚本(`tasks`)。

每一个 task 都需要实现接口 `bind_task()`。

利用 `ioloop`、`thread_executor`、`process_executor` 自行实现运行逻辑。

## Demo

### 异步

import random
import asyncio

from ramjet.engines import process_executor, thread_executor, ioloop

def bind_task():
# 将任务添加进事件循环中

async def async_task():
await asyncio.sleep(3)
for i in range(10):

async def async_child_task(n):
await asyncio.sleep(random.random())
print('child task {} ok!'.format(n))


### 多线程 & 多进程

需要注意子进程没法回收,所以请确保 task 可以很好的结束。

from ramjet.engines import process_executor, thread_executor, ioloop

def bind_task():
# 多线程
thread_executor.submit(task, your_arguments)
# 多进程
process_executor.submit(task, your_arguments)

def task(*args, **kw):


### 定时任务

from ramjet.engines import process_executor, thread_executor, ioloop

def bind_task():
delay = 3600
ioloop.call_later(delay, task, your_auguments)

def task(*args, **kw):
# 可以在 task 内设置下一次执行的时间
# ioloop.call_later(delay, task, *args, **kw)

### HTTP

from aiohttp import web

from ramjet.settings import logger

logger = logger.getChild('tasks.web_demo')

def bind_task():"run web_demo")

def bind_handle(add_route):
add_route('/', DemoHandle)

class DemoHandle(web.View):

async def get(self):
return web.Response(text="New hope")

## Versions


Project details

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Files for ramjet, version 1.8.1
Filename, size File type Python version Upload date Hashes
Filename, size ramjet-1.8.1-py3.6.egg (70.6 kB) File type Egg Python version 3.6 Upload date Hashes View
Filename, size ramjet-1.8.1.tar.gz (24.2 kB) File type Source Python version None Upload date Hashes View

Supported by

AWS AWS Cloud computing Datadog Datadog Monitoring DigiCert DigiCert EV certificate Facebook / Instagram Facebook / Instagram PSF Sponsor Fastly Fastly CDN Google Google Object Storage and Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Salesforce Salesforce PSF Sponsor Sentry Sentry Error logging StatusPage StatusPage Status page