data eval in future
Project description
Deva - 异步流式处理框架
简介
deva 是一个基于 Python 的异步流式处理框架,让编写实时数据流处理管道、事件驱动程序和异步函数变得简单易用。
核心理念:
流式处理:用 Stream 表达数据流动,通过管道操作符组合处理逻辑
事件驱动:基于消息总线和路由机制实现松耦合组件通信
定时调度:内置定时器和调度器,轻松实现周期性任务和计划任务
持久化:集成 SQLite 存储,支持事件回放和状态持久化
可视化:一键生成 Web 监控页面,实时观察数据流状态
典型应用场景:
实时日志监控与告警
流式 ETL 和数据清洗
定时任务和数据采集
量化交易策略执行
事件驱动的微服务
快速开始
安装后,5 分钟快速体验 Deva 的核心功能。
1. 安装 Deva
pip install deva
2. 第一个流处理程序
创建 hello.py:
from deva import Stream, log, Deva
# 创建数据流
source = Stream(name="numbers")
# 添加处理逻辑:乘 2 -> 过滤大于 3 的数 -> 输出日志
source.map(lambda x: x * 2).filter(lambda x: x > 3) >> log
# 启动流处理
source.start()
# 注入数据
for i in range(5):
source.emit(i)
# 运行事件循环
Deva.run()
运行:
python hello.py
输出:
[2026-02-26 10:00:00] INFO: log: 4
[2026-02-26 10:00:00] INFO: log: 6
[2026-02-26 10:00:00] INFO: log: 8
3. 定时任务示例
from deva import timer, log, Deva
import time
# 每隔 1 秒获取当前时间并输出日志
timer(interval=1, func=lambda: time.strftime('%H:%M:%S'), start=True) >> log
Deva.run()
4. Web 可视化
from deva import timer, Deva
# 创建实时数据流
s = timer(interval=1, func=lambda: {'time': __import__('time').time()}, start=True)
# 一键生成 Web 页面
s.webview('/realtime')
Deva.run()
访问 http://127.0.0.1:9999/realtime 查看实时数据。
核心特性
流式处理算子
转换:map(), starmap(), flatten()
过滤:filter(), unique(), distinct()
窗口:sliding_window(), timed_window(), partition()
聚合:reduce(), accumulate(), collect()
合流:zip(), zip_latest(), combine_latest(), union()
缓冲:buffer(), rate_limit(), delay()
事件驱动
消息总线:全局 bus 对象,支持跨进程通信
路由机制:@bus.route() 装饰器实现事件分发
主题订阅:Topic 类支持发布订阅模式
条件触发:when() 函数实现条件驱动
定时与调度
定时器:timer() 支持固定间隔和周期函数
调度器:scheduler() 支持 CRON 表达式和复杂计划
事件监听:进程启动/退出事件处理
持久化存储
DBStream:流式 SQLite 存储
事件回放:支持时间范围回放和历史重放
命名空间:NB() 全局单例工厂
可视化与监控
WebView:一键生成流数据监控页面
管理面板:内置 Admin UI 管理界面
日志系统:统一的结构化日志输出
安装指南
基础安装
pip install deva
或使用 pip3:
pip3 install deva
可选依赖
启用 Redis 消息总线(跨进程通信):
pip install deva[redis]
启用全文检索:
pip install deva[search]
启用 LLM 集成:
pip install deva[llm]
开发环境安装
git clone https://github.com/sostc/deva.git
cd deva
pip install -e ".[dev]"
验证安装
import deva
print(deva.__version__)
使用示例
实时日志监控
监控日志文件并告警错误:
from deva import from_textfile, log, warn, Deva
# 监控日志文件
s = from_textfile('/var/log/app.log', start=True)
# 过滤错误日志并告警
s.filter(lambda line: 'ERROR' in line) >> warn
# 所有日志输出
s >> log
Deva.run()
数据采集与存储
from deva import timer, NB, log, Deva
# 定时获取数据
s = timer(interval=5, func=lambda: {'price': 100.5}, start=True)
# 存储到命名空间
db = NB('prices', key_mode='time')
s >> db
# 同时输出日志
s >> log
Deva.run()
跨进程通信
生产者:
from deva import bus, timer, Deva
# 每秒发送数据到 bus
timer(interval=1, func=lambda: {'event': 'tick'}) >> bus
Deva.run()
消费者:
from deva import bus, log, Deva
# 从 bus 接收并处理数据
bus.filter(lambda x: isinstance(x, dict)).map(lambda x: x['event']) >> log
Deva.run()
Web 可视化面板
from deva import timer, Stream, Deva
# 创建多个数据流
s1 = timer(interval=1, func=lambda: '流 1 数据', start=True, name='流 1')
s2 = timer(interval=2, func=lambda: '流 2 数据', start=True, name='流 2')
# 生成 Web 页面
s1.webview('/stream1')
s2.webview('/stream2')
# 启动服务
Deva.run()
文档导航
文档类型 |
链接 |
|---|---|
快速开始 |
|
使用手册 |
|
安装指南 |
|
示例集合 |
|
API 参考 |
完整文档目录详见项目仓库:https://github.com/sostc/deva/tree/master/docs
社区与支持
源代码仓库
GitHub: https://github.com/sostc/deva
问题反馈
Issue Tracker: https://github.com/sostc/deva/issues
许可证
Copyright © 2018-2026 spark
本项目采用 MIT 许可证。详见 LICENSE 文件。
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file deva-1.4.2.tar.gz.
File metadata
- Download URL: deva-1.4.2.tar.gz
- Upload date:
- Size: 417.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.0.1 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
90776d82f07c49d6e167e8a3ff7667e3f7f0721075ace11d64fe3afa75dd4d33
|
|
| MD5 |
895ee8997fde6b32cecac515ab3d1c48
|
|
| BLAKE2b-256 |
8a7456642155d9f7d2b35b14d5fa9f87781a04770624f6e0b6ef118664e17858
|
File details
Details for the file deva-1.4.2-py3-none-any.whl.
File metadata
- Download URL: deva-1.4.2-py3-none-any.whl
- Upload date:
- Size: 428.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.0.1 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6dd55824bb483b16ace94e603649c9e5f59201cda0e805cef63bf2934b826c6a
|
|
| MD5 |
e8227bb70bfa31cd7100e6ba6f02833c
|
|
| BLAKE2b-256 |
62d93aedfc1f78e73c22898b417c6c4846c1580918eb46ba522f5fe094fb0449
|