Skip to main content

data eval in future

Project description

Deva Logo

Deva - 异步流式处理框架

PyPI Version Python Versions

简介

deva 是一个基于 Python 的异步流式处理框架,让编写实时数据流处理管道、事件驱动程序和异步函数变得简单易用。

核心理念:

  • 流式处理:用 Stream 表达数据流动,通过管道操作符组合处理逻辑

  • 事件驱动:基于消息总线和路由机制实现松耦合组件通信

  • 定时调度:内置定时器和调度器,轻松实现周期性任务和计划任务

  • 持久化:集成 SQLite 存储,支持事件回放和状态持久化

  • 可视化:一键生成 Web 监控页面,实时观察数据流状态

典型应用场景:

  • 实时日志监控与告警

  • 流式 ETL 和数据清洗

  • 定时任务和数据采集

  • 量化交易策略执行

  • 事件驱动的微服务

快速开始

安装后,5 分钟快速体验 Deva 的核心功能。

1. 安装 Deva

pip install deva

2. 第一个流处理程序

创建 hello.py

from deva import Stream, log, Deva

# 创建数据流
source = Stream(name="numbers")

# 添加处理逻辑:乘 2 -> 过滤大于 3 的数 -> 输出日志
source.map(lambda x: x * 2).filter(lambda x: x > 3) >> log

# 启动流处理
source.start()

# 注入数据
for i in range(5):
    source.emit(i)

# 运行事件循环
Deva.run()

运行:

python hello.py

输出:

[2026-02-26 10:00:00] INFO: log: 4
[2026-02-26 10:00:00] INFO: log: 6
[2026-02-26 10:00:00] INFO: log: 8

3. 定时任务示例

from deva import timer, log, Deva
import time

# 每隔 1 秒获取当前时间并输出日志
timer(interval=1, func=lambda: time.strftime('%H:%M:%S'), start=True) >> log

Deva.run()

4. Web 可视化

from deva import timer, Deva

# 创建实时数据流
s = timer(interval=1, func=lambda: {'time': __import__('time').time()}, start=True)

# 一键生成 Web 页面
s.webview('/realtime')

Deva.run()

访问 http://127.0.0.1:9999/realtime 查看实时数据。

核心特性

流式处理算子

  • 转换map(), starmap(), flatten()

  • 过滤filter(), unique(), distinct()

  • 窗口sliding_window(), timed_window(), partition()

  • 聚合reduce(), accumulate(), collect()

  • 合流zip(), zip_latest(), combine_latest(), union()

  • 缓冲buffer(), rate_limit(), delay()

事件驱动

  • 消息总线:全局 bus 对象,支持跨进程通信

  • 路由机制@bus.route() 装饰器实现事件分发

  • 主题订阅Topic 类支持发布订阅模式

  • 条件触发when() 函数实现条件驱动

定时与调度

  • 定时器timer() 支持固定间隔和周期函数

  • 调度器scheduler() 支持 CRON 表达式和复杂计划

  • 事件监听:进程启动/退出事件处理

持久化存储

  • DBStream:流式 SQLite 存储

  • 事件回放:支持时间范围回放和历史重放

  • 命名空间NB() 全局单例工厂

可视化与监控

  • WebView:一键生成流数据监控页面

  • 管理面板:内置 Admin UI 管理界面

  • 日志系统:统一的结构化日志输出

安装指南

基础安装

pip install deva

或使用 pip3:

pip3 install deva

可选依赖

启用 Redis 消息总线(跨进程通信):

pip install deva[redis]

启用全文检索:

pip install deva[search]

启用 LLM 集成:

pip install deva[llm]

开发环境安装

git clone https://github.com/sostc/deva.git
cd deva
pip install -e ".[dev]"

验证安装

import deva
print(deva.__version__)

使用示例

实时日志监控

监控日志文件并告警错误:

from deva import from_textfile, log, warn, Deva

# 监控日志文件
s = from_textfile('/var/log/app.log', start=True)

# 过滤错误日志并告警
s.filter(lambda line: 'ERROR' in line) >> warn

# 所有日志输出
s >> log

Deva.run()

数据采集与存储

from deva import timer, NB, log, Deva

# 定时获取数据
s = timer(interval=5, func=lambda: {'price': 100.5}, start=True)

# 存储到命名空间
db = NB('prices', key_mode='time')
s >> db

# 同时输出日志
s >> log

Deva.run()

跨进程通信

生产者:

from deva import bus, timer, Deva

# 每秒发送数据到 bus
timer(interval=1, func=lambda: {'event': 'tick'}) >> bus

Deva.run()

消费者:

from deva import bus, log, Deva

# 从 bus 接收并处理数据
bus.filter(lambda x: isinstance(x, dict)).map(lambda x: x['event']) >> log

Deva.run()

Web 可视化面板

from deva import timer, Stream, Deva

# 创建多个数据流
s1 = timer(interval=1, func=lambda: '流 1 数据', start=True, name='流 1')
s2 = timer(interval=2, func=lambda: '流 2 数据', start=True, name='流 2')

# 生成 Web 页面
s1.webview('/stream1')
s2.webview('/stream2')

# 启动服务
Deva.run()

文档导航

文档类型

链接

快速开始

source/quickstart.rst

使用手册

source/manual_cn.rst

安装指南

source/installation.rst

示例集合

deva/examples/README.md

API 参考

source/api.rst

完整文档目录详见项目仓库:https://github.com/sostc/deva/tree/master/docs

社区与支持

源代码仓库

问题反馈

许可证

Copyright © 2018-2026 spark

本项目采用 MIT 许可证。详见 LICENSE 文件。

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

deva-1.4.2.tar.gz (417.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

deva-1.4.2-py3-none-any.whl (428.9 kB view details)

Uploaded Python 3

File details

Details for the file deva-1.4.2.tar.gz.

File metadata

  • Download URL: deva-1.4.2.tar.gz
  • Upload date:
  • Size: 417.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.0.1 CPython/3.13.12

File hashes

Hashes for deva-1.4.2.tar.gz
Algorithm Hash digest
SHA256 90776d82f07c49d6e167e8a3ff7667e3f7f0721075ace11d64fe3afa75dd4d33
MD5 895ee8997fde6b32cecac515ab3d1c48
BLAKE2b-256 8a7456642155d9f7d2b35b14d5fa9f87781a04770624f6e0b6ef118664e17858

See more details on using hashes here.

File details

Details for the file deva-1.4.2-py3-none-any.whl.

File metadata

  • Download URL: deva-1.4.2-py3-none-any.whl
  • Upload date:
  • Size: 428.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.0.1 CPython/3.13.12

File hashes

Hashes for deva-1.4.2-py3-none-any.whl
Algorithm Hash digest
SHA256 6dd55824bb483b16ace94e603649c9e5f59201cda0e805cef63bf2934b826c6a
MD5 e8227bb70bfa31cd7100e6ba6f02833c
BLAKE2b-256 62d93aedfc1f78e73c22898b417c6c4846c1580918eb46ba522f5fe094fb0449

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page