Skip to main content

AstraFlux Description

Project description

Overview

AstraFlux is a lightweight distributed service management framework with core capabilities including: - Simplifying the definition, registration, and startup process of distributed RPC services; - Providing scheduling and execution capabilities for asynchronous Worker tasks; - Integrating MongoDB/Redis/RabbitMQ middleware to unify data storage, caching, and message communication capabilities; - Built-in task scheduling, log management, process monitoring and other basic capabilities to reduce the cost of distributed service development.

Configuration Instructions

2.1 Configuration File Structure (config.yaml)

The framework centrally manages core configurations such as middleware connections and logs through config.yaml, with the following example:

# MongoDB Configuration
mongodb:
  host: 127.0.0.1
  port: 27017
  db: astraflux
  username: scheduleAdmin
  password: scheduleAdminPassword

# Redis Configuration
redis:
  host: 127.0.0.1
  port: 6379
  password: scheduleAdminPassword
  db_index: 8

# RabbitMQ Configuration
rabbitmq:
  host: 127.0.0.1
  port: 5672
  username: scheduleAdmin
  password: scheduleAdminPassword

# Logger Configuration
logger:
  path: logs  # Log storage directory
  level: INFO # Log level (DEBUG/INFO/WARN/ERROR)

# Web Manage Configuration
web:
  prot: 7860
  username: scheduleAdmin
  password: scheduleAdminPassword

Framework Usage Steps

3.1 Environment Preparation

Ensure the core framework dependencies are installed:

pip install astraflux

3.3 Define Custom Services

You need to implement RPC services (handling remote calls) and Worker services (handling asynchronous tasks) separately, as shown in the example below (server/test_server.py):

from astraflux import ServiceConstructor, WorkerConstructor, rpc_decorator
import time

# 1. Define RPC Service (handling remote calls)
class TestRpcService(ServiceConstructor):
    # Must define service name for registration and identification
    service_name = "test_server"

    # Mark RPC methods with @rpc_decorator to support external calls
    @rpc_decorator
    def get_version(self):
        """Example: Return service version"""
        return {"code": 200, "data": "test_server v1.0"}

    @rpc_decorator
    def calculate(self, a, b):
        """Example: Receive parameters and return calculation results"""
        return {"code": 200, "data": a + b}

# 2. Define Worker Service (handling asynchronous tasks)
class TestWorkerService(WorkerConstructor):
    # Must define Worker name, consistent with RPC service name
    worker_name = "test_server"

    def run(self, data):
        """
        Must implement the run method as the task execution entry
        :param data: Task input parameters (dictionary format)
        """
        self.logger.info(f"Start executing task with parameters: {data}")
        # Simulate task execution
        time.sleep(2)
        self.logger.info(f"Task execution completed, Task ID: {data.get('task_id')}")
        return {"status": "success"}

3.4 Framework Initialization and Service Registration

Complete framework initialization, service registration and startup in the startup entry file (main.py):

# -*- coding: utf-8 -*-
import os
from astraflux import AstraFlux, launch_register, launch_start
# Import custom services
from server.test_server import TestRpcService, TestWorkerService

# 1. Get current directory (for locating configuration files)
current_dir = os.path.dirname(__file__)

# 2. Initialize framework (load configuration file, singleton mode)
AstraFlux(
    yaml_path=f"{current_dir}/config.yaml",  # Configuration file path
    current_dir=current_dir                 # Framework working directory
)

# 3. Register custom services (RPC+Worker services)
launch_register(services=[
    TestRpcService,
    TestWorkerService
])

# 4. Start all registered services
launch_start()

# Keep main process running
if __name__ == "__main__":
    while True:
        pass

Core Interface Examples

4.1 RPC Service Call

Call registered RPC services through the framework’s built-in proxy_call interface:

from astraflux import proxy_call

# Call get_version method of test_server
result1 = proxy_call(
    service_name="test_server",  # Target service name
    method_name="get_version"    # Target method name
)
print(result1)  # Output: {"code": 200, "data": "test_server v1.0"}

# Call calculate method of test_server (with parameters)
result2 = proxy_call(
    service_name="test_server",
    method_name="calculate",
    a=10, b=20  # Method input parameters
)
print(result2)  # Output: {"code": 200, "data": 30}

4.2 Submit Worker Asynchronous Tasks

Submit asynchronous tasks to the specified Worker through the task_submit interface:

from astraflux import task_submit, subtask_batch_create

# Submit task to test_server Worker
task_id = task_submit(
    worker_name="test_server",  # Target Worker name
    data={                      # Task parameters
        "task_id": "task_001",
        "param1": "value1",
        "param2": 123
    }
)
print(f"Task submitted successfully, Task ID: {task_id}")

# Create subtasks, the framework automatically monitors task status
# and updates the main task after all subtasks are completed
subtask_batch_create(
    subtask_queue='sub_test_server',
    source_id='task_001',
    subtask_list=[
        {'x': 1, 'y': 2},
        {'x': 2, 'y': 3},
    ]
)

4.3 Middleware Operation Interfaces

4.3.1 MongoDB Operations

from astraflux import mongodb_find_from_task, mongodb_find_paginated_from_task

# 1. Query task data
task_data = mongodb_find_from_task(
    query={"task_id": "task_001"},  # Query conditions
    fields={"_id": 0}    # Return fields
)

# Pagination query
mongodb_find_paginated_from_task(
    query={"task_id": "task_001"},
    fields={},
    limit=100,
    skip=0
)

4.3.2 Redis Operations

from astraflux import redis_update_max_process

redis_update_max_process(
    unique_id='test_server_127.0.0.1',  # Service name_IP
    new_value=10  # Set the maximum number of worker processes for the service on the corresponding IP
)

4.4 Task Scheduling Interfaces

from astraflux import add_scheduled_job, start_scheduler

# Add scheduled task (execute every minute)
add_scheduled_job(
    job_id="test_job",
    func=lambda: print("Scheduled task executed"),
    trigger="interval",
    minutes=1
)

# Start scheduler, no need to start manually when using launch_start
start_scheduler()

4.5 Log Usage Example

The framework has a built-in logging module that can be used directly in services:

from astraflux import logger

# Global logger usage
logger.info("Framework started successfully")
logger.error("Service startup failed", exc_info=True)

# Worker service built-in logger
class TestWorkerService(WorkerConstructor):
    worker_name = "test_server"
    def run(self, data):
        self.logger.debug(f"Task parameters: {data}")  # DEBUG level log
        self.logger.warn("Task execution timeout warning")    # WARN level log

4.6 Thread/Process Pool Executor Usage

The framework provides thread and process pool executors with built-in retry mechanisms for reliable task execution. Process executors are suitable for CPU-bound tasks, while thread executors are better for I/O-bound operations.

import time
from astraflux import process_executor, thread_executor

def test_func(x):
    """Test function that runs in an infinite loop printing values"""
    while True:
        print(x)
        time.sleep(1)

if __name__ == '__main__':
    # Important Note for Windows Systems:
    # - Process executor calls must be within __main__ block if not used inside WorkerFunction
    # - Thread executor can be used in WorkerFunction or __main__ block

    # Initialize and use process pool executor
    p = process_executor(max_workers=5, retry_delay=1.0)
    # Submit tasks to process pool
    p.submit(func=test_func, x=1)
    p.submit(func=test_func, x=2)
    # Start process pool executor
    p.start()

    # Initialize and use thread pool executor
    t = thread_executor(max_workers=5, retry_delay=1.0)
    # Submit tasks to thread pool (automatically starts when first task is submitted)
    t.submit(func=test_func, x=3)
    t.submit(func=test_func, x=4)

    # Keep main process running to prevent executor shutdown
    while True:
        pass

**Executor Configuration Parameters**:
- ``max_workers``: Maximum number of worker threads/processes (default: 5)
- ``retry_delay``: Base delay time (in seconds) between retry attempts for failed tasks (default: 1.0)

**Key Differences Between Executors**:
1. **Process Executor**:
   - Requires explicit ``start()`` call to begin task execution
   - Must be used within ``__main__`` block on Windows (outside WorkerFunction)
   - Ideal for CPU-bound tasks that benefit from multiple cores
   - Uses inter-process communication with task serialization

2. **Thread Executor**:
   - Automatically starts when first task is submitted
   - No special execution context requirements
   - Ideal for I/O-bound tasks with frequent waiting operations
   - Lower overhead compared to process executor

**Executor Retry Mechanism**:
Both executors automatically retry failed tasks with exponential backoff:
- Retry delay increases with each attempt (retry_delay * retry_count)
- Maximum retry attempts default to 3 (configurable per task)
- Failed tasks after max retries are tracked and can be retrieved via ``get_failed_tasks()``

For more interface usage, please refer to astraflux.interface.

Astra Agent Usage

5.1 Overview

astra_agents is a built-in AI agent module of the AstraFlux framework. It provides two agent implementations:

  • AstraNativeAgent — A native AI agent that uses the OpenAI SDK directly, supports tool registration, conversation history management, and automatic tool call handling without relying on third-party agent frameworks.

  • OpenClawChat — A client that communicates with an OpenClaw AI gateway, managing system prompts, skill learning tasks, and streaming chat interactions.

Both agents share a common skill system located at astraflux/astra_agents/skill/. The framework also supports expandable skills (user-defined skill plugins).

5.2 Skill System

The agent skill system is a plug-in system that provides AI agents with the ability to call various local and external tools. The system has two types of skills:

astraflux/astra_agents/skill/          # Built-in system skills (read-only)
    dirs/                              # Directory/file system operations
    exec/                              # Shell command execution (cross-platform)
    files/                             # File read/write (multi-format)
    Internet/                          # Web search, fetch, download, URL check

<project_dir>/<expand_skill_directory>/  # User-defined expandable skills (read-only)

Built-in Skills:

Skill

Tools

Description

dirs

create_directory, remove_directory, list_directory, rename_directory, get_directory_info, set_permissions, get_permissions

Create, remove, list, rename directories; query and set file/directory permission information.

exec

execute

Cross-platform shell command execution (Windows/Linux/macOS/WSL). Supports various shells (cmd, PowerShell, bash, zsh), environment variable injection, timeout, and working directory settings.

files

read_file, write_file, show_format_example

Unified file reading and writing for multiple formats: txt, json, csv, xml, yaml, toml, ini, env, excel. Automatically detects format by file extension or explicit specification.

Internet

search_web, fetch_webpage, download_file, check_url

Search (supports DuckDuckGo, Google, Baidu, Yandex, Yahoo, Bing), fetch web page content as markdown/text, download files, check URL accessibility.

5.3 Configuration

The agent is configured through the config.yaml file in the AstraFlux configuration:

# Example config.yaml entries for astra_agents

openai:  # Configuration for AstraNativeAgent
  model_api:
    server: https://api.openai.com/v1
    apikey: your-api-key-here
    name: gpt-4o  # or gpt-4o-mini, deepseek-chat, etc.
  temporary_directory: temporary_directory   # Writable directory for agent
  expand_skill_directory: expand_skill       # User skill plugin directory

openclaw:  # Configuration for OpenClawChat
  server: http://localhost:8120
  token: your-openclaw-token
  session_key: your-session-key
  temporary_directory: temporary_directory   # Writable directory for agent
  expand_skill_directory: expand_skill       # User skill plugin directory

Configuration fields:

  • server — API server URL (OpenAI-compatible API or OpenClaw gateway)

  • apikey / token — API authentication

  • name — Model name (e.g., gpt-4o, deepseek-chat)

  • temporary_directory — The only writable directory the agent can use

  • expand_skill_directory — Directory for user-defined skill plugins

5.4 AstraNativeAgent Basic Usage

# -*- coding: utf-8 -*-
import os
from astraflux import *

# 1. Initialize AstraFlux framework
current_dir = os.path.dirname(__file__)
AstraFlux(
    yaml_path=f"{current_dir}/config.yaml",
    current_dir=current_dir
)

# 2. Chat with the agent (async)
import asyncio

def backcall(data):
    print(data)

message = 'Hello !'
asyncio.run(o.chat(message, backcall))

5.5 OpenClawChat Basic Usage

# -*- coding: utf-8 -*-
import os
from astraflux import AstraFlux

# 1. Initialize AstraFlux framework
current_dir = os.path.dirname(__file__)
AstraFlux(
    yaml_path=f"{current_dir}/config.yaml",
    current_dir=current_dir
)

# 2. Send a message (streaming)
response_gen = send_message_to_openclaw(
    user_message="List files in the current directory"
)
for chunk in response_gen:
    print(chunk, end="", flush=True)

Key Methods:

  • send_message_to_openclaw(user_message, user_id, prompt) — Send a message (returns streaming generator)

  • stream_chat(payload) — Low-level streaming API call

  • initialize_ai() — Called automatically on construction; sends system learning tasks

5.6 Custom Tool Registration

You can register custom Python functions as tools for AstraNativeAgent:

from astraflux.astra_agents.astra_agent import AstraAgentApi

# After creating the agent instance
@agent.register_tool
def get_weather(city: str) -> str:
    """Get current weather for a city."""
    return f"Weather in {city}: Sunny, 22C"

# Or call directly
def my_tool(name: str, count: int = 10) -> str:
    """My custom tool description."""
    return f"Processed {count} items for {name}"

agent.register_tool(my_tool, name="process_items", description="Process items for a user")

The agent automatically extracts parameter schemas from Python type annotations and builds OpenAI-compatible tool definitions.

5.7 Expandable Skills

To create your own skill plugin, follow the built-in skill conventions:

  1. Create a subdirectory in the expand_skill_directory configured in config.yaml

  2. Add an __init__.py containing @function_tool-decorated functions

  3. On initialization, the agent automatically scans and registers all __init__.py files in the expandable skill directory

For more interface usage, please refer to astraflux.interface.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

astraflux-1.4.0.tar.gz (87.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

astraflux-1.4.0-py3-none-any.whl (100.0 kB view details)

Uploaded Python 3

File details

Details for the file astraflux-1.4.0.tar.gz.

File metadata

  • Download URL: astraflux-1.4.0.tar.gz
  • Upload date:
  • Size: 87.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.11

File hashes

Hashes for astraflux-1.4.0.tar.gz
Algorithm Hash digest
SHA256 8de72c2a6f3ac79374bb135b21eaa666ec3fe4f17a29b19b1dc7a66961dcf1d3
MD5 d3d1d504d4d1f340609162ce7fb5c082
BLAKE2b-256 eaf2ebcb7b4a0a9cd5433e7cf28dbe3f46d018e1b0b3dce492273392f8b45414

See more details on using hashes here.

File details

Details for the file astraflux-1.4.0-py3-none-any.whl.

File metadata

  • Download URL: astraflux-1.4.0-py3-none-any.whl
  • Upload date:
  • Size: 100.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.11

File hashes

Hashes for astraflux-1.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 a08dccc466c6855b728c66b26a856cdd7ec101e3410fd0eb6421c8f64191a81f
MD5 6c70ec066a4fc5519083a140681fe8f2
BLAKE2b-256 4356a4bf3e55083d4e9cb72e1b3f0be10c99e5b4802b506e47daef7cb60d822d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page