Skip to main content

AstraFlux Description

Project description

AstraFlux User Documentation

AstraFlux enables rapid setup of distributed task systems with minimal configuration. Key features:
1.Asynchronous/scheduled tasks
2.Distributed task processing
3.Service registration, monitoring, dynamic configuration injection, load balancing

Framework Initialization

1. Create File config.yaml

Mongodb:
  host: 127.0.0.1
  port: 27017
  db: astraflux
  username: scheduleAdmin
  password: scheduleAdminPassword

Redis:
  host: 127.0.0.1
  port: 6379
  password: scheduleAdminPassword

RabbitMQ:
  host: 127.0.0.1
  port: 5672
  username: scheduleAdmin
  password: scheduleAdminPassword

logger:
  level: INFO

2. Initialize in main.py

import os
os_dir = os.path.dirname(__file__)
af = AstraFlux('config.yaml', os_dir)

Service Registration

  1. Create service file (e.g., test_server.py):

# -*- coding: utf-8 -*-

from nexusflow import *


class RpcFunction(ServiceConstructor):
    service_name = 'test_server'

    """All functions are automatically proxied for RPC calls"""

    def get_service_name(self):
        return {"service_version": self.service_version}

    def test_func(self, **args):
        return args

class WorkerFunction(WorkerConstructor):
    worker_name = 'test_server'

    def run(self, data):
        self.loguru.info(data)
        """
        Executed when new tasks appear in worker_name queue.
        Implement business logic here. `data` contains all task data.
        """
  1. Register the service in main.py

import test_server
af.registry(services=[test_server])

af.start()

Scheduled/Asynchronous Tasks

from nexusflow.interface import *

# Generate Snowflake ID
_id = snowflake_id()

# Create task
message = {'task_id': 'test_003', 'status': 'wait', 'name': 'xxxx'}
task_submit_databases(queue='test_server', message=message)

# Create subtasks (automatic status updates)
subtask_create(
    source_task_id='test_003',
    subtask_queue='test_server_sub',
    subtasks=[{
        'task_id': snowflake_id(),
        'name': 'subtask1',
    }]
)

# Stop task
task_stop(task_id='test_003')

# MongoDB interfaces
mongodb_task()     # Task operations
mongodb_node()     # Node operations
mongodb_services() # Service operations

# Redis interfaces
redis_task()
redis_services()

# RPC service call (auto load-balanced)
result = proxy_call(
    service_name='test_server',
    method_name='test_func',
    a=1, b=2  # Function arguments
)

Function Reference

from nexusflow.interface import *

# Generate Snowflake ID
_id = snowflake_id()

# Create task
message = {'task_id': 'test_003', 'status': 'wait', 'name': 'xxxx'}
task_submit_databases(queue='test_server', message=message)

# Create subtasks (automatic status updates)
subtask_create(
    source_task_id='test_003',
    subtask_queue='test_server_sub',
    subtasks=[{
        'task_id': snowflake_id(),
        'name': 'subtask1',
    }]
)

# Stop task
task_stop(task_id='test_003')

# MongoDB interfaces
mongodb_task()     # Task operations
mongodb_node()     # Node operations
mongodb_services() # Service operations

# Redis interfaces
redis_task()
redis_services()

# RPC service call (auto load-balanced)
result = proxy_call(
    service_name='test_server',
    method_name='test_func',
    a=1, b=2  # Function arguments
)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

astraflux-1.0.2.tar.gz (34.9 kB view details)

Uploaded Source

File details

Details for the file astraflux-1.0.2.tar.gz.

File metadata

  • Download URL: astraflux-1.0.2.tar.gz
  • Upload date:
  • Size: 34.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.3

File hashes

Hashes for astraflux-1.0.2.tar.gz
Algorithm Hash digest
SHA256 b1942f4a5f84bfc58b59e08d7cf1c961aeef27fb7dfd567386a79085da3e6210
MD5 fe5eabe8bfd228423ed03b33ff3fddc6
BLAKE2b-256 439bf77ac027608f336747b06141bed7e282232ee25e14ec65347d90149323e2

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page