Skip to main content

A lightweight Python Web/TCP server framework implemented with pure Python standard library

Project description

PyLiteFlux

PyLiteFlux 是一个轻量级的Python Web/TCP服务器框架,完全基于Python标准库实现,不依赖任何第三方包。它支持HTTP和TCP协议,提供了简单而强大的API来构建网络应用。适合用于学习和理解Python后端框架的概念和实现原理。

特性

  • 支持HTTP和TCP服务器
  • 支持TCP长连接和消息推送
  • 支持后台任务处理
  • 内置中间件系统
  • 简单的路由注册
  • 支持配置管理
  • 支持请求上下文

安装

pip install pyliteflux

快速开始

基本使用

from pyliteflux import Server, Route

server = Server()

@Route.register("/hello", method="GET")
def hello(request):
    return {"message": "Hello, World!"}

server.run()

TCP长连接和消息推送示例

下面是一个简单的聊天系统示例,展示了如何使用TCP长连接和消息推送功能:

from pyliteflux import Server, Route
import time

server = Server()
server.config.tcp_keep_alive = True  # 启用TCP长连接

# HTTP接口:发送消息给指定客户端
@Route.register("/hello", method="GET")
def hello(request):
    # 从查询参数中获取目标client_id
    target_client = request.query_params.get("client_id")
    
    def heavy_task():
        time.sleep(5)  # 模拟耗时操作
        server.push_message("Hello via HTTP GET!", target_client)
    
    request.bg.add(heavy_task)
    return {
        "message": "Hello via HTTP GET!",
        "target_client": target_client,
        "query_params": request.query_params
    }

# TCP接口:列出所有活跃连接
@Route.register("clients", protocol="tcp")
def list_clients(request):
    clients = list(server._active_connections.keys())
    return {
        "message": "Active TCP clients",
        "clients": clients,
        "count": len(clients)
    }

# TCP接口:订阅消息
@Route.register("subscribe", protocol="tcp")
def subscribe_tcp(request):
    client_id = f"{request.client_address[0]}:{request.client_address[1]}"
    return {"message": "subscribed", "client_id": client_id}

server.run()

主要功能

HTTP服务器

  • 支持GET、POST、PUT、DELETE方法
  • 支持路由参数
  • 支持查询参数
  • 支持JSON和表单数据

TCP服务器

  • 支持长连接
  • 支持消息推送
  • 支持JSON和XML数据格式
  • 支持自定义命令

后台任务

  • 支持异步任务处理
  • 任务队列管理
  • 线程池执行

中间件系统

  • 请求处理中间件
  • 响应处理中间件
  • 上下文管理中间件

中间件和全局上下文示例

中间件使用

中间件可以处理请求和响应,实现认证、日志、性能监控等功能:

from pyliteflux import Server, Route, Middleware

# 定义认证中间件
class AuthMiddleware(Middleware):
    def process_request(self, request):
        # 检查认证token
        token = request.headers.get("Authorization")
        if not token:
            return {"error": "Unauthorized"}
        
        # 将认证信息存储在g对象中
        g.user = self.validate_token(token)
        return None  # 继续处理请求
    
    def process_response(self, request, response):
        # 处理响应
        if isinstance(response, dict):
            response["authenticated"] = hasattr(g, "user")
        return response
    
    def validate_token(self, token):
        # 实际项目中这里应该验证token
        return {"id": 1, "name": "user1"}

# 定义日志中间件
class LogMiddleware(Middleware):
    def process_request(self, request):
        # 记录请求开始时间
        g.start_time = time.time()
        return None
    
    def process_response(self, request, response):
        # 计算请求处理时间
        duration = time.time() - g.start_time
        Logger.info(f"Request processed in {duration:.2f}s")
        return response

# 使用中间件
server = Server()
server.middleware_manager.add_middleware(AuthMiddleware())
server.middleware_manager.add_middleware(LogMiddleware())

# 在路由中使用g对象
@Route.register("/user", method="GET")
def get_user(request):
    if hasattr(g, "user"):
        return {
            "message": "User info",
            "user": g.user,
            "request_id": g.request_id  # 每个请求的唯一ID
        }
    return {"error": "User not found"}

server.run()

全局上下文使用

g对象是请求级别的全局对象,可以在整个请求过程中共享数据:

from pyliteflux import Server, Route, g

server = Server()

# 在中间件中设置数据
class DataMiddleware(Middleware):
    def process_request(self, request):
        g.db = Database()  # 假设的数据库连接
        g.cache = Cache()  # 假设的缓存连接
        return None
    
    def process_response(self, request, response):
        # 清理资源
        if hasattr(g, "db"):
            g.db.close()
        if hasattr(g, "cache"):
            g.cache.close()
        return response

# 在路由中使用g对象
@Route.register("/data", method="GET")
def get_data(request):
    # 使用g对象中的资源
    data = g.db.query("SELECT * FROM users")
    cached_data = g.cache.get("users")
    
    return {
        "data": data,
        "cached": cached_data,
        "request_id": g.request_id
    }

# 在后台任务中使用g对象
@Route.register("/async", method="GET")
def async_task(request):
    def background_job():
        # 注意:后台任务有自己的g对象实例
        g.task_id = uuid.uuid4()
        Logger.info(f"Background task {g.task_id} started")
        
    request.bg.add(background_job)
    return {"message": "Task scheduled"}

server.run()

实际应用场景

  1. 用户认证和授权:
class AuthMiddleware(Middleware):
    def process_request(self, request):
        token = request.headers.get("Authorization")
        if token:
            g.user = self.authenticate(token)
            g.permissions = self.get_permissions(g.user)
        return None

@Route.register("/admin", method="GET")
def admin_panel(request):
    if not hasattr(g, "permissions") or "admin" not in g.permissions:
        return {"error": "Access denied"}, 403
    return {"message": "Welcome, admin!"}
  1. 性能监控:
class PerformanceMiddleware(Middleware):
    def process_request(self, request):
        g.start_time = time.time()
        g.sql_queries = []
        return None
    
    def process_response(self, request, response):
        duration = time.time() - g.start_time
        metrics = {
            "duration": duration,
            "sql_queries": len(g.sql_queries),
            "memory_usage": self.get_memory_usage()
        }
        Logger.info(f"Performance metrics: {metrics}")
        return response
  1. 请求跟踪:
class TracingMiddleware(Middleware):
    def process_request(self, request):
        g.trace_id = str(uuid.uuid4())
        g.span_id = str(uuid.uuid4())
        g.traces = []
        return None
    
    def process_response(self, request, response):
        if isinstance(response, dict):
            response["trace_id"] = g.trace_id
            response["traces"] = g.traces
        return response

@Route.register("/trace", method="GET")
def traced_endpoint(request):
    g.traces.append({"event": "processing", "time": time.time()})
    result = process_data()
    g.traces.append({"event": "completed", "time": time.time()})
    return {"data": result}

这些示例展示了中间件和g对象的强大功能:

  1. 请求前后的处理
  2. 资源的自动管理
  3. 请求级别的数据共享
  4. 性能监控和日志记录
  5. 认证和授权
  6. 请求跟踪和调试

中间件和g对象的组合使用可以大大提高开发效率和代码质量。

配置选项

server = Server()
server.config.update(
    http_host="127.0.0.1",
    http_port=8000,
    tcp_host="127.0.0.1",
    tcp_port=8001,
    tcp_keep_alive=True,  # 启用TCP长连接
    info=True  # 启用日志
)

API文档

路由装饰器

@Route.register(path, method="GET", protocol="http")
  • path: 路由路径
  • method: HTTP方法(GET/POST/PUT/DELETE)
  • protocol: 协议类型(http/tcp)

服务器方法

server.run()  # 启动服务器
server.stop()  # 停止服务器
server.push_message(message, client_id=None)  # 推送消息

请求对象

request.query_params  # 查询参数
request.form_data    # 表单数据
request.json_data    # JSON数据
request.headers      # 请求头
request.bg.add(task) # 添加后台任务

响应格式

PyLiteFlux支持多种响应格式,包括普通JSON响应、流式响应和自定义JSON响应:

1. 普通JSON响应

@Route.register("/hello", method="GET")
def hello(request):
    return {"message": "Hello, World!"}  # 自动转换为JSON响应

2. 流式响应

流式响应适用于大数据传输或需要实时推送数据的场景:

from pyliteflux import StreamResponse

@Route.register("/stream", method="GET")
def stream_data(request):
    def generate_data():
        for i in range(10):
            time.sleep(1)  # 模拟数据生成
            yield f"data: {i}\n\n"
    
    return StreamResponse(
        generate_data(),
        content_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive"
        }
    )

# 带参数的流式响应
@Route.register("/download", method="GET")
def download_file(request):
    def file_reader():
        with open("large_file.txt", "rb") as f:
            while chunk := f.read(8192):
                yield chunk
    
    return StreamResponse(
        file_reader(),
        content_type="application/octet-stream",
        headers={
            "Content-Disposition": "attachment; filename=large_file.txt"
        }
    )

3. HTTPJSONResponse响应

当需要更细粒度地控制JSON响应时,可以使用HTTPJSONResponse:

from pyliteflux import HTTPJSONResponse

@Route.register("/api/user", method="GET")
def get_user(request):
    user_data = {"id": 1, "name": "John"}
    return HTTPJSONResponse(
        data=user_data,
        status=200,
        headers={
            "X-Custom-Header": "value",
            "Access-Control-Allow-Origin": "*"
        }
    )

# 错误响应示例
@Route.register("/api/error", method="GET")
def error_response(request):
    return HTTPJSONResponse(
        data={"error": "Not Found", "code": "404"},
        status=404,
        headers={"X-Error-Type": "NotFound"}
    )

# 自定义序列化示例
@Route.register("/api/custom", method="GET")
def custom_serialization(request):
    class CustomEncoder(json.JSONEncoder):
        def default(self, obj):
            if isinstance(obj, datetime):
                return obj.isoformat()
            return super().default(obj)
    
    data = {
        "timestamp": datetime.now(),
        "message": "Custom serialization"
    }
    
    return HTTPJSONResponse(
        data=data,
        json_encoder=CustomEncoder,
        headers={"Content-Type": "application/json"}
    )

4. 组合使用示例

@Route.register("/api/complex", method="GET")
def complex_response(request):
    response_type = request.query_params.get("type", "json")
    
    if response_type == "stream":
        def stream():
            for i in range(5):
                yield json.dumps({"count": i}) + "\n"
        return StreamResponse(stream(), content_type="application/x-ndjson")
    
    elif response_type == "error":
        return HTTPJSONResponse(
            {"error": "Bad Request"},
            status=400,
            headers={"X-Error-Details": "Invalid type"}
        )
    
    else:
        return {"message": "Regular JSON response"}  # 自动转换为JSON

参数使用说明

服务器配置参数

服务器实例支持多种配置参数,可以通过config对象进行设置:

from pyliteflux import Server

server = Server()

# 方式1:通过属性设置
server.config.http_host = "127.0.0.1"
server.config.http_port = 8000
server.config.tcp_keep_alive = True

# 方式2:通过update方法批量设置
server.config.update(
    http_host="127.0.0.1",
    http_port=8000,
    tcp_host="127.0.0.1",
    tcp_port=8001,
    tcp_keep_alive=True,
    info=True
)

配置参数说明:

参数 类型 默认值 说明
http_host str "127.0.0.1" HTTP服务器主机地址
http_port int 8000 HTTP服务器端口
tcp_host str "127.0.0.1" TCP服务器主机地址
tcp_port int 8001 TCP服务器端口
tcp_keep_alive bool False 是否启用TCP长连接
info bool True 是否启用日志输出

路由参数

路由装饰器支持多种参数配置:

from pyliteflux import Route

# HTTP路由
@Route.register("/hello", method="GET")  # 基本GET请求
@Route.register("/user", method="POST")  # POST请求
@Route.register("/user/{id}", method="GET")  # 带路径参数的路由
@Route.register("/files/*", method="GET")  # 通配符路由

# TCP路由
@Route.register("command", protocol="tcp")  # TCP命令路由

路由参数说明:

参数 类型 默认值 说明
path str 必填 路由路径
method str "GET" HTTP方法(GET/POST/PUT/DELETE)
protocol str "http" 协议类型(http/tcp)

请求参数

HTTP请求可以通过多种方式传递参数:

# 1. 查询参数
@Route.register("/search", method="GET")
def search(request):
    keyword = request.query_params.get("keyword")
    page = request.query_params.get("page", "1")
    return {"keyword": keyword, "page": page}

# 2. 路径参数
@Route.register("/user/{user_id}/posts/{post_id}", method="GET")
def get_post(request):
    user_id = request.path_params["user_id"]
    post_id = request.path_params["post_id"]
    return {"user_id": user_id, "post_id": post_id}

# 3. 表单数据
@Route.register("/upload", method="POST")
def upload(request):
    file_name = request.form_data.get("file_name")
    content = request.form_data.get("content")
    return {"file_name": file_name}

# 4. JSON数据
@Route.register("/api/data", method="POST")
def handle_json(request):
    data = request.json_data
    return {"received": data}

TCP连接参数

TCP连接相关的参数和使用方式:

# 1. 启用TCP长连接
server = Server()
server.config.tcp_keep_alive = True

# 2. 发送消息给特定客户端
@Route.register("/push", method="GET")
def push_message(request):
    client_id = request.query_params.get("client_id")
    message = request.query_params.get("message", "Hello!")
    server.push_message(message, client_id)
    return {"status": "sent"}

# 3. 广播消息给所有客户端
@Route.register("/broadcast", method="GET")
def broadcast(request):
    message = request.query_params.get("message", "Broadcast!")
    server.push_message(message)  # 不指定client_id则广播
    return {"status": "broadcasted"}

中间件参数

中间件可以通过process_request和process_response方法处理请求和响应:

class CustomMiddleware(Middleware):
    def __init__(self, **options):
        self.options = options
    
    def process_request(self, request):
        # 处理请求
        request.custom_attr = self.options.get("custom_attr")
        return None
    
    def process_response(self, request, response):
        # 处理响应
        if isinstance(response, dict):
            response["processed_by"] = self.options.get("name", "CustomMiddleware")
        return response

# 使用中间件
server = Server()
server.middleware_manager.add_middleware(
    CustomMiddleware(
        name="MyMiddleware",
        custom_attr="custom_value"
    )
)

后台任务参数

后台任务支持参数传递:

@Route.register("/task", method="GET")
def async_task(request):
    def background_job(user_id, task_type="default"):
        time.sleep(5)
        Logger.info(f"Processing {task_type} task for user {user_id}")
    
    # 添加带参数的后台任务
    user_id = request.query_params.get("user_id", "0")
    request.bg.add(
        background_job,
        user_id,  # 位置参数
        task_type="important"  # 关键字参数
    )
    
    return {"message": "Task scheduled"}

这些参数的使用示例展示了框架的灵活性和功能性。你可以根据需要组合使用这些参数来实现各种功能。所有参数都有合理的默认值,使框架既易于使用又足够灵活。

需要注意的是:

  1. 参数名称区分大小写
  2. 必填参数没有默认值时必须提供
  3. 某些参数组合可能会相互影响
  4. 建议遵循Python的命名规范

注意事项

  1. TCP长连接需要显式启用:server.config.tcp_keep_alive = True
  2. 后台任务会在独立的线程池中
  3. 推送消息时如果不指定client_id,将广播给所有连接的客户端

许可证

MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyliteflux-0.1.2.tar.gz (17.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pyliteflux-0.1.2-py3-none-any.whl (16.0 kB view details)

Uploaded Python 3

File details

Details for the file pyliteflux-0.1.2.tar.gz.

File metadata

  • Download URL: pyliteflux-0.1.2.tar.gz
  • Upload date:
  • Size: 17.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.4 CPython/3.10.15 Windows/10

File hashes

Hashes for pyliteflux-0.1.2.tar.gz
Algorithm Hash digest
SHA256 e40b9de74711ee7cf12e42ce1c70d5637c7b95b4313f0636e1782a82bc1c1a6a
MD5 bdc1a241f633ceb8b831846f5ed448a1
BLAKE2b-256 00a1ab86ceefbe12d434d9d403ed086075b11e704d887c511f84f7d1f54a5c30

See more details on using hashes here.

File details

Details for the file pyliteflux-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: pyliteflux-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 16.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.4 CPython/3.10.15 Windows/10

File hashes

Hashes for pyliteflux-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 16f56e0ac8a13d6182ef91c584fe9f3c0750efeb81529f607b63aa0559621673
MD5 517d9a044ab1e8b683bb5f3a0ceb3bc8
BLAKE2b-256 2ef2f8f11f16e74f4b3f889516fa91279f766e08681297ce6c2dae9ea25692ce

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page