Skip to main content

Background tasks module for Bazis framework.

Project description

Bazis-BG

PyPI version Python Versions License

Background task package for the Bazis framework. Provides an asynchronous task management system with scheduler support, handlers, periodic tasks, and specialized classes for data import/export.

Quick Start

# Install package
uv add bazis-bg

# Create task
from bazis.contrib.bg.basic.base import BgBase

class MyTask(BgBase):
    name = 'My Task'
    
    def handle(self):
        self.next_phase('Processing', expected=100)
        for i in range(100):
            self.progress(d_count=10)

# Run task
task = MyTask.delay()

# Start scheduler
python manage.py bg_scheduler

Table of Contents

Description

Bazis-BG is an extension for the Bazis framework that provides a powerful background task management system. The package is built on top of Django ORM and provides a distributed task processing system with support for:

  • Asynchronous task execution
  • Task scheduler (cron-like)
  • Multiple handlers
  • Progress tracking
  • Specialized classes for data import/export
  • Management through Django admin panel

Core Concept

The background task system in Bazis-BG consists of three main components:

  1. Scheduler (bg_scheduler) - monitors tasks in the database and manages handlers
  2. Handlers (bg_handler) - execute tasks in separate processes
  3. Tasks - classes inheriting from BgBase containing business logic

When an application calls Task.delay(), a record is created in the database. The scheduler discovers the new task and passes it to a free handler. The handler executes the task in an isolated process, updating status and progress in the database.

This approach provides:

  • Task execution isolation
  • Scalability through adding handlers
  • Fault tolerance - tasks are not lost if a handler crashes
  • Transparency - complete information about execution state

Features

  • Distributed processing: Multiple handlers for parallel task execution
  • Scheduler: Support for periodic tasks with cron syntax and intervals
  • Progress tracking: Detailed information about task execution state
  • Execution history: Saving execution phases with timestamps
  • Logging: Built-in logging system for each task
  • Task interruption: Ability to interrupt a running task
  • Transactions: Support for transactional execution
  • Files: Working with files (upload/download)
  • Specialized classes:
    • BgBaseDownload — data export to Excel
    • BgBaseLoad — data import from Excel/ZIP
    • BgBaseDownloadModel — Django model export
  • Healthcheck: Monitoring scheduler and handler health

Advantages Over Other Solutions

  1. Django Integration: Uses Django ORM, migrations, and admin
  2. Ease of Use: Minimal code to create tasks
  3. Transparency: Complete execution information through admin
  4. Specialized Classes: Ready solutions for import/export
  5. Distribution: Easy scaling through adding handlers

Requirements

  • Python: 3.12+
  • Bazis: base package must be installed
  • PostgreSQL: 12+
  • Redis: for caching
  • Dependencies:
    • openpyxl — working with Excel files
    • psutil — process management
    • crontab — parsing cron expressions
    • aiohttp — asynchronous HTTP requests

Note! The current framework implementation requires Redis as the cache backend and PostgreSQL as the database.

Installation

Using uv (recommended)

uv add bazis-bg

Using pip

pip install bazis-bg

For Development

git clone git@github.com:ecofuture-tech/bazis-bg.git
cd bazis-bg
uv sync --dev

Project Setup

  1. Add application to Django settings:
INSTALLED_APPS = [
    # ...
    'bazis.contrib.bg',
    # ...
]
  1. Apply migrations:
python manage.py migrate
  1. Configure environment variables:
# Scheduler
BS_BAZIS_TASK_SCHEDULER_HOST=localhost
BS_BAZIS_TASK_SCHEDULER_PORT=49001
BS_BAZIS_TASK_SCHEDULER_IDLE=0.2

# Handlers
BS_BAZIS_TASK_HANDLERS_LOCAL=5
BS_BAZIS_TASK_HANDLERS_GLOBAL=50
BS_BAZIS_TASK_HANDLER_PORT=49100
BS_BAZIS_TASK_HANDLER_IDLE=0.5

# Storage
BS_BAZIS_TASK_FOLDER=bg

Architecture

System Components

┌─────────────────┐
│   Application   │ ──> delay() ──> Create Task
└─────────────────┘
         │
         ▼
┌─────────────────┐
│   Scheduler     │ ──> Monitor tasks
│  (bg_scheduler) │ ──> Start handlers
└─────────────────┘
         │
         ▼
┌─────────────────┐
│    Handlers     │ ──> Execute Task
│  (bg_handler)   │ ──> Update status
└─────────────────┘

Models

  • Task — main task model
  • TaskCron — periodic task model
  • TaskHandler — handler model

Task States

  1. draft — Draft
  2. waiting — Waiting to start
  3. starting — Starting
  4. running — Running
  5. done — Completed

Package Structure

bazis.contrib.bg/
├── basic/
│   ├── base.py              # Base BgBase class
│   ├── base_download.py     # Data export class
│   ├── base_download_model.py  # Django model export
│   └── base_load.py         # Data import class
├── models_abstract.py       # Abstract models
├── models.py                # Concrete models
├── admin.py                 # Django admin
└── management/
    └── commands/
        ├── bg_scheduler.py  # Scheduler command
        └── bg_handler.py    # Handler command

Usage

Creating Basic Task

from bazis.contrib.bg.basic.base import BgBase

class DataProcessingTask(BgBase):
    # Human-readable task name
    name = 'Data Processing'
    
    # Number of simultaneously running tasks
    parallel = 2
    
    # Tasks that block execution
    blocked = ['myapp.tasks.AnotherTask']
    
    # Execute in transaction
    is_transaction = True
    
    def pre_handle(self):
        """Called before handle()"""
        self.log.info('Preliminary preparation')
    
    def handle(self):
        """Main task logic"""
        # Set phase with expectation
        self.next_phase('Loading data', expected=1000)
        
        for i in range(1000):
            # Your logic
            
            # Update progress
            self.progress(
                performed=i+1,  # Performed
                d_time=3,       # Sync every 3 sec
                d_count=100     # Or every 100 operations
            )
        
        # Next phase
        self.next_phase('Saving results', expected=100)
        
        # ... save logic
        
        # Save result
        self.set_result({'processed': 1000, 'saved': 100})
    
    def post_handle(self):
        """Called after handle()"""
        self.log.info('Finalization')
    
    def excepting(self):
        """Exception handling"""
        self.log.error('An error occurred')
    
    def interrupting(self):
        """Interruption handling"""
        self.log.warning('Task interrupted by user')
    
    def finishing(self):
        """Always executed at the end"""
        self.log.info('Resource cleanup')

Running Tasks

Asynchronous Launch

# Basic launch
task = DataProcessingTask.delay()

# With parameters
task = DataProcessingTask.delay(
    param1='value1',
    param2='value2'
)

# With custom name
task = DataProcessingTask.delay(
    name='January data processing',
    param1='value1'
)

# With file
with open('data.xlsx', 'rb') as fp:
    task = DataProcessingTask.delay(
        fp=fp,
        param1='value1'
    )

# With author
task = DataProcessingTask.delay(
    author=request.user,
    param1='value1'
)

# With environment variables
task = DataProcessingTask.delay(
    envs={'MY_VAR': 'value'},
    param1='value1'
)

Synchronous Launch

# Waits for task completion
task = DataProcessingTask.delay_sync(param1='value1')

Periodic Tasks

Through Django Admin

  1. Open "Periodic Tasks" section
  2. Create new task:
    • Name: Task description
    • Task Class: Full path (e.g., myapp.tasks.DataProcessingTask)
    • Period: Cron expression or seconds
    • Args/Kwargs: JSON with parameters
    • Enabled: Yes

Period Examples

# Every 60 seconds
period = "60"

# Every day at 03:00
period = "0 3 * * *"

# Every Monday at 09:00
period = "0 9 * * 1"

# Every hour
period = "0 * * * *"

# Every 5 minutes
period = "*/5 * * * *"

Data Export

Base Export Class

from bazis.contrib.bg.basic.base_download import BgBaseDownload

class ReportDownload(BgBaseDownload):
    name = 'Report Export'
    file_name = 'report'
    
    # Fields to export
    fields_read = ['id', 'name', 'created_at', 'status']
    
    # Fields marked bold in header
    fields_marked = {'name', 'status'}
    
    # Column sizes
    fields_size = {
        'id': 10,
        'name': 30,
        'created_at': 20,
        'status': 15
    }
    
    # Row limit per file (for splitting into archive)
    limit_rows_split = 500000
    
    def get_titles(self):
        """Returns column headers"""
        return {
            'id': 'ID',
            'name': 'Name',
            'created_at': 'Created At',
            'status': 'Status'
        }
    
    def get_queryset(self):
        """Returns data for export"""
        return MyModel.objects.all().values(*self.fields_read)
    
    def get_count(self):
        """Returns number of records"""
        return MyModel.objects.count()

Export with Grouped Headers

def get_titles(self):
    return {
        'groups': [
            {
                'name_union': 'Basic Information',
                'columns': ['id', 'name'],
                'marked': True
            },
            {
                'name_union': 'Dates',
                'columns': ['created_at', 'updated_at']
            }
        ],
        'titles': {
            'id': 'ID',
            'name': 'Name',
            'created_at': 'Created',
            'updated_at': 'Updated'
        }
    }

Django Model Export

from bazis.contrib.bg.basic.base_download_model import BgBaseDownloadModel
from django.apps import apps

class VehicleDownload(BgBaseDownloadModel):
    model = apps.get_model('myapp.Vehicle')
    fields_read = ['id', 'gnum', 'vehicle_model__brand__name', 'country__name']
    
    # verbose_name of fields will be picked up automatically

Data Import

from bazis.contrib.bg.basic.base_load import BgBaseLoad

class DataImport(BgBaseLoad):
    name = 'Data Import'
    
    # Fields in column order
    fields_read = ['name', 'price', 'quantity']
    
    # Skip first row (headers)
    skip_titles = True
    
    # Progress update period
    progress_count = 1000
    
    # Track row grouping
    groups_check = False
    
    # Add meta-data (_meta)
    with_meta = False
    
    def load(self, reader):
        """
        Process loaded data
        
        reader - generator of dictionaries with data
        """
        self.next_phase('Saving data')
        
        for row in reader:
            # row = {'name': '...', 'price': '...', 'quantity': '...'}
            
            # Create object
            MyModel.objects.create(
                name=row['name'],
                price=float(row['price']),
                quantity=int(row['quantity'])
            )
            
            self.progress(d_count=100)

Task API

Task Management Methods

# Get task object
task = self.get_task()

# Update task data
self.task_update(
    phase='New phase',
    expected=1000,
    performed=500
)

# Save file in task
with open('result.xlsx', 'rb') as fp:
    self.task_file_save('result.xlsx', fp, file_params={'rows': 1000})

# Get file from task
fp = self.task_file_get()

# Set result
self.set_result({'status': 'success', 'count': 1000})

# Force synchronization
self.flush()

Working with Phases

# Simple phase
self.next_phase('Processing')

# Phase with expectation (number)
self.next_phase('Loading files', expected=100)

# Phase with expectation (dictionary)
self.next_phase('Processing', expected={
    'files': 10,
    'records': 1000
})

# Update progress
self.progress(
    performed=50,           # Performed
    expected=100,          # Expected (can be updated)
    d_time=5,              # Sync every 5 sec
    d_count=100,           # Or every 100 operations
    need_sync=True         # Force synchronization
)

# Progress with dictionary
self.progress(
    performed={'files': 5, 'records': 500},
    d_count=10
)

Logging

# Use self.log to write to task log
self.log.info('Information message')
self.log.warning('Warning')
self.log.error('Error')
self.log.debug('Debug information')

# Logs are automatically saved in task.log field

Configuration

Environment Variables

# Scheduler
BS_BAZIS_TASK_SCHEDULER_HOST=localhost  # Scheduler host
BS_BAZIS_TASK_SCHEDULER_PORT=49001      # Scheduler healthcheck port
BS_BAZIS_TASK_SCHEDULER_IDLE=0.2        # Delay between iterations, sec

# Handlers
BS_BAZIS_TASK_HANDLERS_LOCAL=5          # Number of local handlers
BS_BAZIS_TASK_HANDLERS_GLOBAL=50        # Global number of handlers
BS_BAZIS_TASK_HANDLER_PORT=49100        # Base port for handlers
BS_BAZIS_TASK_HANDLER_IDLE=0.5          # Delay between task checks, sec

# Storage
BS_BAZIS_TASK_FOLDER=bg                 # Folder for task files
BS_BAZIS_STORAGE_BG=None                # Storage class (default FileSystemStorage)

Django Settings

# settings.py

# Number of local handlers
BAZIS_TASK_HANDLERS_LOCAL = 3

# Global number of handlers
BAZIS_TASK_HANDLERS_GLOBAL = 50

# Base port for handlers
BAZIS_TASK_HANDLER_PORT = 8100

# Scheduler task check interval (seconds)
BAZIS_TASK_SCHEDULER_IDLE = 1

# Handler task check interval (seconds)
BAZIS_TASK_HANDLER_IDLE = 1

# Custom storage for task files
BAZIS_STORAGE_BG = 'myapp.storage.S3Storage'

Task Management

Through Django Admin

  1. Background Tasks (Task)

    • View all tasks
    • Filter by state, date
    • Interrupt running tasks
    • View logs and results
    • Download result files
  2. Periodic Tasks (TaskCron)

    • Create/edit periodic tasks
    • Enable/disable
    • View next run time
  3. Handlers (TaskHandler)

    • View active handlers
    • Monitor load

Programmatically

from django.apps import apps

Task = apps.get_model('bg.Task')

# Get task
task = Task.objects.get(id=task_id)

# Interrupt task
task.interrupt = True
task.save()

# Check state
if task.is_done:
    if task.is_success:
        print('Successfully completed')
    elif task.is_error:
        print(f'Error: {task.error}')
    elif task.is_interrupt:
        print('Interrupted')

# Get result
result = task.result

# Download file
if task.file:
    with task.file.open('rb') as fp:
        data = fp.read()

Running Services

# Start scheduler
python manage.py bg_scheduler

# Start handler manually
python manage.py bg_handler --host 127.0.0.1 --port 8100

# In production, it's recommended to use supervisor or systemd

Examples

Example 1: Simple Task with Progress

from bazis.contrib.bg.basic.base import BgBase

class SimpleTask(BgBase):
    name = 'Simple Task'
    
    def handle(self):
        self.next_phase('Processing', expected=100)
        
        for i in range(100):
            # Your logic
            self.log.info(f'Processing item {i+1}')
            self.progress(d_count=10)

# Launch
task = SimpleTask.delay()

Example 2: Task with File

from bazis.contrib.bg.basic.base import BgBase

class FileProcessingTask(BgBase):
    name = 'File Processing'
    
    def handle(self):
        # Get file
        fp = self.task_file_get()
        if not fp:
            raise Exception('File not provided')
        
        self.next_phase('Reading file')
        data = fp.read()
        
        # Processing...
        result = self.process_data(data)
        
        # Save result
        from io import BytesIO
        result_fp = BytesIO(result)
        self.task_file_save('result.txt', result_fp)
    
    def process_data(self, data):
        # Your processing logic
        return data.upper()

# Launch with file
with open('input.txt', 'rb') as fp:
    task = FileProcessingTask.delay(fp=fp)

Example 3: Report Export

from bazis.contrib.bg.basic.base_download_model import BgBaseDownloadModel
from django.apps import apps

class MonthlyReport(BgBaseDownloadModel):
    model = apps.get_model('myapp.Order')
    fields_read = ['id', 'number', 'customer__name', 'total', 'created_at']
    fields_marked = {'total'}
    file_name = 'monthly_report'

# Launch
task = MonthlyReport.delay()

# Get file after completion
from django.apps import apps
Task = apps.get_model('bg.Task')
task = Task.objects.get(id=task.id)
if task.is_success:
    file_url = task.file.url

Example 4: Data Import

from bazis.contrib.bg.basic.base_load import BgBaseLoad
from django.apps import apps

class ProductImport(BgBaseLoad):
    name = 'Product Import'
    fields_read = ['name', 'sku', 'price', 'quantity']
    skip_titles = True
    
    def load(self, reader):
        Product = apps.get_model('myapp.Product')
        
        self.next_phase('Importing products')
        
        created = 0
        updated = 0
        
        for row in reader:
            product, is_created = Product.objects.update_or_create(
                sku=row['sku'],
                defaults={
                    'name': row['name'],
                    'price': row['price'],
                    'quantity': row['quantity']
                }
            )
            
            if is_created:
                created += 1
            else:
                updated += 1
            
            self.progress(d_count=100)
        
        self.set_result({'created': created, 'updated': updated})

# Launch with file
with open('products.xlsx', 'rb') as fp:
    task = ProductImport.delay(fp=fp)

Example 5: Task with Parameters

from bazis.contrib.bg.basic.base import BgBase
from django.apps import apps

class SendEmailTask(BgBase):
    name = 'Send Email'
    parallel = 5  # Maximum 5 simultaneous sends
    
    def handle(self, email_list, subject, message):
        """
        Args:
            email_list: list of email addresses
            subject: email subject
            message: email body
        """
        self.next_phase('Sending emails', expected=len(email_list))
        
        for i, email in enumerate(email_list):
            try:
                # Send email
                self.send_email(email, subject, message)
                self.log.info(f'Sent to {email}')
            except Exception as e:
                self.log.error(f'Error sending to {email}: {e}')
            
            self.progress(performed=i+1, d_count=10)
    
    def send_email(self, email, subject, message):
        from django.core.mail import send_mail
        send_mail(subject, message, 'noreply@example.com', [email])

# Launch
emails = ['user1@example.com', 'user2@example.com']
task = SendEmailTask.delay(
    email_list=emails,
    subject='Important notification',
    message='Message text'
)

Example 6: Task with Locks

from bazis.contrib.bg.basic.base import BgBase

class DataExportTask(BgBase):
    name = 'Data Export'
    parallel = 1  # Only one task simultaneously
    blocked = [
        'myapp.tasks.DataImportTask',  # Blocked by import
        'myapp.tasks.DataUpdateTask'   # Blocked by update
    ]
    
    def handle(self):
        self.next_phase('Exporting data')
        # Export logic
        pass

class DataImportTask(BgBase):
    name = 'Data Import'
    parallel = 1
    blocked = [
        'myapp.tasks.DataExportTask',
        'myapp.tasks.DataUpdateTask'
    ]
    
    def handle(self):
        self.next_phase('Importing data')
        # Import logic
        pass

Development

Setting Up Development Environment

# Clone repository
git clone git@github.com:ecofuture-tech/bazis-bg.git
cd bazis-bg

# Install dependencies
uv sync --dev

# Run tests
pytest

# Check code
ruff check .

# Format code
ruff format .

Test Structure

tests/
├── test_base.py              # Base class tests
├── test_download.py          # Export tests
├── test_load.py              # Import tests
├── test_scheduler.py         # Scheduler tests
└── test_handler.py           # Handler tests

Contributing

We welcome your contributions! Here's how you can help:

  1. Fork the repository
  2. Create a branch for new functionality (git checkout -b feature/amazing-feature)
  3. Make changes
  4. Run tests (pytest)
  5. Commit changes (git commit -m 'Add amazing feature')
  6. Push to branch (git push origin feature/amazing-feature)
  7. Open Pull Request

Please ensure that:

  • Tests are written for new functionality
  • Documentation is updated
  • Existing code style is maintained
  • Changes are added to changelog

License

Apache License 2.0

See LICENSE file for details.

Links

Support

If you have questions or problems:

Package Ecosystem

Bazis supports extensions through additional packages:

  • bazis — framework core
  • bazis-bg — background task system
  • bazis-test-utils — testing utilities
  • bazis-<n> — other extensions (add bazis- prefix to name)

All extension packages require installation of the base bazis package.


Made with ❤️ by Bazis team

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

bazis_bg-2.2.1.tar.gz (129.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

bazis_bg-2.2.1-py3-none-any.whl (82.7 kB view details)

Uploaded Python 3

File details

Details for the file bazis_bg-2.2.1.tar.gz.

File metadata

  • Download URL: bazis_bg-2.2.1.tar.gz
  • Upload date:
  • Size: 129.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.8 {"installer":{"name":"uv","version":"0.10.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for bazis_bg-2.2.1.tar.gz
Algorithm Hash digest
SHA256 b4c1243f113cbc424adc66b7380f1fe459fec74a56c4eb9784a529cfe8350d2c
MD5 86d8bfc43f9a93721a8f81fd959debe5
BLAKE2b-256 d67839728eefca85e4000e409d23d462d970224bc411b6959602bd6a6a650a1f

See more details on using hashes here.

File details

Details for the file bazis_bg-2.2.1-py3-none-any.whl.

File metadata

  • Download URL: bazis_bg-2.2.1-py3-none-any.whl
  • Upload date:
  • Size: 82.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.8 {"installer":{"name":"uv","version":"0.10.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for bazis_bg-2.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 5fd0038fb179bd43df21b6556dd4748796892c5f8f6c352d4878710b2c624b94
MD5 c61e02da0eb13d6e4ca63ae3737b75e2
BLAKE2b-256 e9caadaa2716b920288025bdf774beacf4a29c0045628ee19cc98f79a0d30bfb

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page