Skip to main content

data eval in future

Project description

Deva Logo

Deva - 异步流式处理框架

PyPI Version Python Versions

简介

deva 是一个基于 Python 的异步流式处理框架,让编写实时数据流处理管道、事件驱动程序和异步函数变得简单易用。

核心理念:

  • 流式处理:用 Stream 表达数据流动,通过管道操作符组合处理逻辑

  • 事件驱动:基于消息总线和路由机制实现松耦合组件通信

  • 定时调度:内置定时器和调度器,轻松实现周期性任务和计划任务

  • 持久化:集成 SQLite 存储,支持事件回放和状态持久化

  • 可视化:一键生成 Web 监控页面,实时观察数据流状态

典型应用场景:

  • 实时日志监控与告警

  • 流式 ETL 和数据清洗

  • 定时任务和数据采集

  • 量化交易策略执行

  • 事件驱动的微服务

快速开始

安装后,5 分钟快速体验 Deva 的核心功能。

1. 安装 Deva

pip install deva

2. 第一个流处理程序

创建 hello.py

from deva import Stream, log, Deva

# 创建数据流
source = Stream(name="numbers")

# 添加处理逻辑:乘 2 -> 过滤大于 3 的数 -> 输出日志
source.map(lambda x: x * 2).filter(lambda x: x > 3) >> log

# 启动流处理
source.start()

# 注入数据
for i in range(5):
    source.emit(i)

# 运行事件循环
Deva.run()

运行:

python hello.py

输出:

[2026-02-26 10:00:00] INFO: log: 4
[2026-02-26 10:00:00] INFO: log: 6
[2026-02-26 10:00:00] INFO: log: 8

3. 定时任务示例

from deva import timer, log, Deva
import time

# 每隔 1 秒获取当前时间并输出日志
timer(interval=1, func=lambda: time.strftime('%H:%M:%S'), start=True) >> log

Deva.run()

4. Web 可视化

from deva import timer, Deva

# 创建实时数据流
s = timer(interval=1, func=lambda: {'time': __import__('time').time()}, start=True)

# 一键生成 Web 页面
s.webview('/realtime')

Deva.run()

访问 http://127.0.0.1:9999/realtime 查看实时数据。

核心特性

流式处理算子

  • 转换map(), starmap(), flatten()

  • 过滤filter(), unique(), distinct()

  • 窗口sliding_window(), timed_window(), partition()

  • 聚合reduce(), accumulate(), collect()

  • 合流zip(), zip_latest(), combine_latest(), union()

  • 缓冲buffer(), rate_limit(), delay()

事件驱动

  • 消息总线:全局 bus 对象,支持跨进程通信

  • 路由机制@bus.route() 装饰器实现事件分发

  • 主题订阅Topic 类支持发布订阅模式

  • 条件触发when() 函数实现条件驱动

定时与调度

  • 定时器timer() 支持固定间隔和周期函数

  • 调度器scheduler() 支持 CRON 表达式和复杂计划

  • 事件监听:进程启动/退出事件处理

持久化存储

  • DBStream:流式 SQLite 存储

  • 事件回放:支持时间范围回放和历史重放

  • 命名空间NB() 全局单例工厂

可视化与监控

  • WebView:一键生成流数据监控页面

  • 管理面板:内置 Admin UI 管理界面

  • 日志系统:统一的结构化日志输出

安装指南

基础安装

pip install deva

或使用 pip3:

pip3 install deva

可选依赖

启用 Redis 消息总线(跨进程通信):

pip install deva[redis]

启用全文检索:

pip install deva[search]

启用 LLM 集成:

pip install deva[llm]

开发环境安装

git clone https://github.com/sostc/deva.git
cd deva
pip install -e ".[dev]"

验证安装

import deva
print(deva.__version__)

使用示例

实时日志监控

监控日志文件并告警错误:

from deva import from_textfile, log, warn, Deva

# 监控日志文件
s = from_textfile('/var/log/app.log', start=True)

# 过滤错误日志并告警
s.filter(lambda line: 'ERROR' in line) >> warn

# 所有日志输出
s >> log

Deva.run()

数据采集与存储

from deva import timer, NB, log, Deva

# 定时获取数据
s = timer(interval=5, func=lambda: {'price': 100.5}, start=True)

# 存储到命名空间
db = NB('prices', key_mode='time')
s >> db

# 同时输出日志
s >> log

Deva.run()

跨进程通信

生产者:

from deva import bus, timer, Deva

# 每秒发送数据到 bus
timer(interval=1, func=lambda: {'event': 'tick'}) >> bus

Deva.run()

消费者:

from deva import bus, log, Deva

# 从 bus 接收并处理数据
bus.filter(lambda x: isinstance(x, dict)).map(lambda x: x['event']) >> log

Deva.run()

Web 可视化面板

from deva import timer, Stream, Deva

# 创建多个数据流
s1 = timer(interval=1, func=lambda: '流 1 数据', start=True, name='流 1')
s2 = timer(interval=2, func=lambda: '流 2 数据', start=True, name='流 2')

# 生成 Web 页面
s1.webview('/stream1')
s2.webview('/stream2')

# 启动服务
Deva.run()

文档导航

文档类型

链接

快速开始

source/quickstart.rst

使用手册

source/manual_cn.rst

安装指南

source/installation.rst

示例集合

deva/examples/README.md

API 参考

source/api.rst

完整文档目录详见项目仓库:https://github.com/sostc/deva/tree/master/docs

社区与支持

源代码仓库

问题反馈

许可证

Copyright © 2018-2026 spark

本项目采用 MIT 许可证。详见 LICENSE 文件。

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

deva-1.4.3.tar.gz (418.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

deva-1.4.3-py3-none-any.whl (429.1 kB view details)

Uploaded Python 3

File details

Details for the file deva-1.4.3.tar.gz.

File metadata

  • Download URL: deva-1.4.3.tar.gz
  • Upload date:
  • Size: 418.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.0.1 CPython/3.13.12

File hashes

Hashes for deva-1.4.3.tar.gz
Algorithm Hash digest
SHA256 2b120e5278f23fb2d1f9a4097814ca0f090d7a25c8e5e061f31aa0de04715044
MD5 08d8d7797ee7b0e708b2bfa95f2babf2
BLAKE2b-256 62f6da51324c1b99f87367a5cd40612ad04e420ff958712555bb8332bead01e4

See more details on using hashes here.

File details

Details for the file deva-1.4.3-py3-none-any.whl.

File metadata

  • Download URL: deva-1.4.3-py3-none-any.whl
  • Upload date:
  • Size: 429.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.0.1 CPython/3.13.12

File hashes

Hashes for deva-1.4.3-py3-none-any.whl
Algorithm Hash digest
SHA256 a766e66fcd75a8fe99387fe6d68cc9e1849cee190a89803d97df462b16faab24
MD5 b502dbe8d71595352723fac9f4664bd2
BLAKE2b-256 43b05ed2f77cff237116d50fa3f17223cc2540d263e41dc0016360e460982ade

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page