Skip to main content
Help us improve PyPI by participating in user testing. All experience levels needed!

Scripts Manager

Project description


[![PyPI version](](
[![Commitizen friendly](](

> わが征くは星の大海 —— Yang Wen-li

| ![]( |
| 后台脚本的引擎 |

## Links

- [Documents](
- [Github](
- [PyPI](

## Install & Run

Need Python3.5.x.

# Install from pypi

$ pip install ramjet

# Install from source

$ python install
$ python -m ramjet [--debug=true]

## Description

基于 asyncio 和 consurrent.futures 运行脚本(`tasks`)。

每一个 task 都需要实现接口 `bind_task()`。

利用 `ioloop`、`thread_executor`、`process_executor` 自行实现运行逻辑。

## Demo

### 异步

import random
import asyncio

from ramjet.engines import process_executor, thread_executor, ioloop

def bind_task():
# 将任务添加进事件循环中

async def async_task():
await asyncio.sleep(3)
for i in range(10):

async def async_child_task(n):
await asyncio.sleep(random.random())
print('child task {} ok!'.format(n))


### 多线程 & 多进程

需要注意子进程没法回收,所以请确保 task 可以很好的结束。

from ramjet.engines import process_executor, thread_executor, ioloop

def bind_task():
# 多线程
thread_executor.submit(task, your_arguments)
# 多进程
process_executor.submit(task, your_arguments)

def task(*args, **kw):


### 定时任务

from ramjet.engines import process_executor, thread_executor, ioloop

def bind_task():
delay = 3600
ioloop.call_later(delay, task, your_auguments)

def task(*args, **kw):
# 可以在 task 内设置下一次执行的时间
# ioloop.call_later(delay, task, *args, **kw)

### HTTP

from aiohttp import web

from ramjet.settings import logger

logger = logger.getChild('tasks.web_demo')

def bind_task():"run web_demo")

def bind_handle(add_route):
add_route('/', DemoHandle)

class DemoHandle(web.View):

async def get(self):
return web.Response(text="New hope")

## Versions


Project details

Release history Release notifications

This version
History Node


History Node


History Node


History Node


History Node


History Node


History Node


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Filename, size & hash SHA256 hash help File type Python version Upload date
ramjet-1.8.1-py3.6.egg (70.6 kB) Copy SHA256 hash SHA256 Egg 3.6 Mar 7, 2018
ramjet-1.8.1.tar.gz (24.2 kB) Copy SHA256 hash SHA256 Source None Mar 7, 2018

Supported by

Elastic Elastic Search Pingdom Pingdom Monitoring Google Google BigQuery Sentry Sentry Error logging CloudAMQP CloudAMQP RabbitMQ AWS AWS Cloud computing Fastly Fastly CDN DigiCert DigiCert EV certificate StatusPage StatusPage Status page