Skip to main content

AstraFlux Description

Project description

1. Introduction

AstraFlux Framework is designed to help developers quickly build distributed task management and microservice systems, providing convenient functionalities such as service registration, RPC calls, task distribution and processing.

2. Directory Structure

The recommended project directory structure is as follows:

project_root/
├── servers/
│   ├── test_server.py       # Test service implementation
│   └── sub_test_server.py   # Sub-test service implementation
├── main.py                  # Service startup script
├── test.py                  # Function test script
└── config.yaml              # Configuration file

3. Configuration File

Create a config.yaml configuration file with the following content:

mongodb:
  host: 127.0.0.1
  port: 27017
  db: astraflux  # Default database name
  username: scheduleAdmin
  password: scheduleAdminPassword

redis:
  host: 127.0.0.1
  port: 6379
  password: scheduleAdminPassword

rabbitmq:
  host: 127.0.0.1
  port: 5672
  username: scheduleAdmin
  password: scheduleAdminPassword

logger:
  path: logs  # Log saving path (working directory + this path)
  level: INFO  # Log level

4. Service Implementation

4.1 Basic Service Structure

Services need to implement two types of core components:

  • Subclass of ServiceConstructor: Provides RPC interfaces

  • Subclass of WorkerConstructor: Processes distributed tasks

4.2 Test Service Example (test_server.py)

# -*- coding: utf-8 -*-
import time

from astraflux import ServiceConstructor, WorkerConstructor, rpc_decorator


class RpcFunction(ServiceConstructor):
    """RPC service implementation class"""
    service_name = 'test_server'  # Unique service identifier

    @rpc_decorator  # Mark as a remotely callable method
    def get_service_name(self):
        """Get service information"""
        return {"service_version": self.ipaddr}

    @rpc_decorator
    def test_func(self, **args):
        """Test RPC method, return incoming parameters"""
        return args


class WorkerFunction(WorkerConstructor):
    """Task processing class"""
    worker_name = 'test_server'  # Bound task queue name

    def run(self, data):
        """Core method for processing tasks"""
        self.logger.info(f"Received task: {data}")
        time.sleep(5)  # Simulate task processing time
        self.logger.info(f"Task completed: {data['task_id']}")

4.3 Sub-test Service Example (sub_test_server.py)

Consistent with the structure of test_server.py, only the service identifier needs to be modified:

# -*- coding: utf-8 -*-
import time

from astraflux import ServiceConstructor, WorkerConstructor, rpc_decorator


class RpcFunction(ServiceConstructor):
    service_name = 'sub_test_server'  # Sub-service name

    @rpc_decorator
    def get_service_name(self):
        return {"service_version": self.ipaddr}

    @rpc_decorator
    def test_func(self, **args):
        return args


class WorkerFunction(WorkerConstructor):
    worker_name = 'sub_test_server'  # Sub-service task queue name

    def run(self, data):
        self.logger.info(data)
        time.sleep(3)
        self.logger.info(f"sub worker done {data['task_id']}")

5. Service Startup Script (main.py)

Used to register and start services:

# -*- coding: utf-8 -*-
import os

from astraflux import AstraFlux

# Import custom services
from servers import test_server, sub_test_server

if __name__ == "__main__":
    # Get current directory
    current_dir = os.path.dirname(__file__)

    # Initialize the framework (load configuration file)
    af = AstraFlux(
        yaml_file=f'{current_dir}/config.yaml',
        current_dir=current_dir
    )

    # Register service list
    af.registry(services=[test_server, sub_test_server])

    # Start services (wait=True means blocking the main process)
    af.start(wait=True)

6. Function Testing (test.py)

Used to test RPC calls and task submission:

from astraflux import proxy_call, task_submit_to_db, subtask_batch_create, snowflake_id
import os

if __name__ == "__main__":
    current_dir = os.path.dirname(__file__)

    # Initialize the framework (load configuration)
    af = AstraFlux(
        yaml_file=f'{current_dir}/config.yaml',
        current_dir=current_dir
    )

    # 1. Test RPC call
    rpc_result = proxy_call(
        service_name='test_server',  # Target service name
        method_name='test_func',     # Target method name
        x=1, y=2                     # Incoming parameters
    )
    print("RPC call result:", rpc_result)

    # 2. Submit main tasks and subtasks
    for i in range(3):
        # Generate unique task ID
        main_task_id = snowflake_id()

        # Submit main task to test_server queue
        task_submit_to_db(
            queue_name='test_server',
            task_data={'task_id': main_task_id, 'index': i}
        )

        # Create 5 subtasks for each main task (submitted to sub_test_server queue)
        # The scheduler will automatically monitor the status of subtasks and update the source task
        # automatically after all subtasks are completed
        subtask_ids = subtask_batch_create(
            source_task_id=main_task_id,  # Associated main task ID
            subtask_queue='sub_test_server',  # Subtask queue
            subtask_list=[{'task_id': f'{main_task_id}_{j}', 'parent': main_task_id} for j in range(5)]
        )
        print(f"Subtask IDs of main task {main_task_id}:", subtask_ids)

7. Distributed Deployment

7.1 Multi-machine Deployment

When deploying services on different machines, only need to modify the registered service list in main.py:

# Start only test_server
af.registry(services=[test_server])

# Or start only sub_test_server
af.registry(services=[sub_test_server])

7.2 Load Balancing

The framework will automatically perform load balancing for instances with the same service name, and each task will be assigned to only one service instance for execution.

7.3 Adjust Worker Quantity

The maximum number of workers for a specified service instance can be adjusted through the following method:

from astraflux import update_max_worker

# Modify the maximum number of workers for the test_server service (127.0.0.1 instance) to 10
update_max_worker(name='test_server', ipaddr='127.0.0.1', max_worker=10)

8. Scheduler API

Support local process/thread task management and distributed scheduling:

# distributed scheduler
add_schedule_job(
    job_id='test_001',
    cron_expression='*/10 * * * * *',
    function=test_func,
    keyword_arguments={'x': 2},
    execution_type='thread'  # thread or process
)

af.registry(services=[test_server, sub_test_server])
af.start(wait=False) # Simultaneously enable the scheduler and asynchronous tasks, set to False

# If both the scheduler and asynchronous tasks are enabled, the distributed scheduler needs to be started first
# Asynchronous task scheduler, supporting processes/threads
from astraflux import gen_thread_executor, gen_process_executor

executor = gen_thread_executor(logger=logger, max_workers=20, retry_delay=1)
# executor = gen_process_executor(logger=logger, max_workers=20, retry_delay=1)

def test_func(x):
    while True:
        print(x)

# Submit a task to the executor
executor.submit(test_func, 1)

# Start the executor
executor.start()

# Wait for all tasks to complete
executor.wait_completion()

# Shutdown the executor
executor.shutdown()

9. API Reference

9.1 interface/definitions.py

  • set_root_path(root_path: str): Sets the root directory for global variables.

  • get_root_path(): Gets the root directory for global variables.

  • set_current_dir(current_dir: str): Sets the current directory for global variables.

  • get_current_dir() -> str | None: Gets the current directory for global variables.

  • set_rabbitmq_uri(uri: str): Sets the RabbitMQ URI for global variables.

  • get_rabbitmq_uri() -> str | None: Gets the RabbitMQ URI for global variables.

  • set_redis_uri(redis_uri: str): Sets the Redis URI for global variables.

  • get_redis_uri() -> str | None: Gets the Redis URI for global variables.

  • set_mongo_uri(mongo_uri: str): Sets the Mongo URI for global variables.

  • get_mongo_uri() -> str | None: Gets the Mongo URI for global variables.

  • set_logs_path(logs_path: str): Sets the logs path for global variables.

  • get_logs_path() -> str | None: Gets the logs path for global variables.

  • set_log_level(log_level: str) -> None: Sets the log level for global variables.

  • get_log_level() -> str | None: Gets the log level for global variables.

9.2 interface/rpc.py

  • generate_unique(): Generates a unique identifier, returns the generated identifier string.

  • remote_call(service_name: str, method_name: str, **params): Makes a remote procedure call to the specified service and method, returns the call result.

  • proxy_call(service_name: str, method_name: str, **params): Makes a remote procedure call to the specified service and method, returns the call result.

  • rpc_decorator(func): RPC function decorator, returns the decorated function.

  • service_running(service_cls): Starts the RabbitMQ consumer, the parameter is the class corresponding to the function to be called when a message is received.

9.3 interface/rabbitmq.py

  • rabbitmq_send_message(queue: str, message: dict): Sends a message to the specified queue in RabbitMQ; if the message is not a JSON string, it will be converted.

  • rabbitmq_receive_message(queue: str, callback): Consumes messages from the specified queue in RabbitMQ and processes the received messages through a callback function.

9.4 interface/logger.py

  • get_logger(filename: str = None, task_id: str = None) -> logging.Logger: Gets a logger instance, which can specify the log file name and task ID.

9.5 interface/executor.py

  • gen_thread_executor(logger, max_workers: int = 5, retry_delay: float = 1.0) -> ThreadPoolExecutorWithRetry: Factory function for creating ThreadPoolExecutorWithRetry instances.

  • gen_process_executor(logger, max_workers: int = 5, retry_delay: float = 1.0) -> ProcessPoolExecutorWithRetry: Factory function for creating ProcessPoolExecutorWithRetry instances.

9.6 interface/utils.py

  • get_date_time_obj(data_str: str, fmt=False, timezone=False): Returns a time object according to the specified timezone and format.

  • format_converted_time(data_str: str, fmt=False, timezone=False, r_fmt=False): Formats the time string according to the specified format and timezone.

  • get_converted_time(fmt=False, timezone=False): Specifies the timezone and format, returns the current time string.

  • convert_timestamp_to_timezone(timestamp, fmt=False, timezone=False): Converts the timestamp to a time string in the specified timezone and format.

  • get_converted_timestamp(date_string: str, fmt=False, timezone=False): Converts the time string to a timestamp in the specified timezone and format.

  • convert_timestamp_to_timezone_str(timestamp, timezone=False, fmt=False): Converts the timestamp to a time string.

  • get_ipaddr() -> str: Retrieves the IP address of the current machine by establishing a UDP connection.

9.7 interface/snowflake.py

  • snowflake_id() -> str: Returns a snowflake ID generation function.

9.8 interface/data_access.py

  • MongoDBCollector class: MongoDB collection operation wrapper class, containing methods such as update, array_push, array_pull.

  • task_submit_to_db(queue_name: str, task_data: TaskData, weight: int = DefaultValues.TASK.WEIGHT) -> str: Submits the task to the database (persistence only, no message queue distribution), returns the unique ID of the submitted task.

  • task_submit_to_db_and_mq(queue_name: str, task_data: TaskData, weight: int = DefaultValues.TASK.WEIGHT) -> str: Submits the task to the database and distributes it to RabbitMQ (triggers execution), returns the unique task ID.

  • subtask_batch_create(source_task_id: str, subtask_queue: str, subtask_list: List[TaskData]) -> List[str]: Batch creates subtasks and saves them to the database (linked to the parent task), returns a list of subtask IDs.

9.9 interface/scheduler.py

  • add_schedule_job(job_id: str, cron_expression: str, function: Callable, timezone: str = "UTC", arguments: Optional[List] = None, keyword_arguments: Optional[Dict] = None, allowed_ips: Optional[List[str]] = None, execution_type: str = "thread") -> bool: Schedules a job in the distributed scheduler, returns a boolean indicating success.

  • remove_scheduled_job(job_id: str) -> bool: Removes a scheduled job from the distributed scheduler, returns a boolean indicating success.

  • start_scheduler() -> None: Starts the distributed scheduler.

  • stop_scheduler() -> None: Stops the distributed scheduler.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

astraflux-1.2.0.tar.gz (64.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

astraflux-1.2.0-py3-none-any.whl (75.4 kB view details)

Uploaded Python 3

File details

Details for the file astraflux-1.2.0.tar.gz.

File metadata

  • Download URL: astraflux-1.2.0.tar.gz
  • Upload date:
  • Size: 64.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.11

File hashes

Hashes for astraflux-1.2.0.tar.gz
Algorithm Hash digest
SHA256 e53d505131d0615a447d854c01c7971c8b3ea96a00491ed33bb9b7a474928d55
MD5 25d23936be4e9530159ccb6abdfb204f
BLAKE2b-256 e18dbb56e3215e693f7157d31c668300f524f352ec849a33b89b4ca6a5a49012

See more details on using hashes here.

File details

Details for the file astraflux-1.2.0-py3-none-any.whl.

File metadata

  • Download URL: astraflux-1.2.0-py3-none-any.whl
  • Upload date:
  • Size: 75.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.11

File hashes

Hashes for astraflux-1.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 cd54f23e4bc93b630096486c9ab48d71bdf6fb6bb71a975c0c475de26429df63
MD5 90ee4e4df054d8d53b2925aead857c33
BLAKE2b-256 06b8d28981fe9874cfa4a982520c634345b0be3cbe19b14a03f459e2b5d9cea8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page