Background tasks module for Bazis framework.
Project description
Bazis-BG
Background task package for the Bazis framework. Provides an asynchronous task management system with scheduler support, handlers, periodic tasks, and specialized classes for data import/export.
Quick Start
# Install package
uv add bazis-bg
# Create task
from bazis.contrib.bg.basic.base import BgBase
class MyTask(BgBase):
name = 'My Task'
def handle(self):
self.next_phase('Processing', expected=100)
for i in range(100):
self.progress(d_count=10)
# Run task
task = MyTask.delay()
# Start scheduler
python manage.py bg_scheduler
Table of Contents
- Description
- Core Concept
- Features
- Requirements
- Installation
- Architecture
- Usage
- Task API
- Configuration
- Task Management
- Examples
- Development
- Contributing
- License
- Links
Description
Bazis-BG is an extension for the Bazis framework that provides a powerful background task management system. The package is built on top of Django ORM and provides a distributed task processing system with support for:
- Asynchronous task execution
- Task scheduler (cron-like)
- Multiple handlers
- Progress tracking
- Specialized classes for data import/export
- Management through Django admin panel
Core Concept
The background task system in Bazis-BG consists of three main components:
- Scheduler (
bg_scheduler) - monitors tasks in the database and manages handlers - Handlers (
bg_handler) - execute tasks in separate processes - Tasks - classes inheriting from
BgBasecontaining business logic
When an application calls Task.delay(), a record is created in the database. The scheduler discovers the new task and passes it to a free handler. The handler executes the task in an isolated process, updating status and progress in the database.
This approach provides:
- Task execution isolation
- Scalability through adding handlers
- Fault tolerance - tasks are not lost if a handler crashes
- Transparency - complete information about execution state
Features
- Distributed processing: Multiple handlers for parallel task execution
- Scheduler: Support for periodic tasks with cron syntax and intervals
- Progress tracking: Detailed information about task execution state
- Execution history: Saving execution phases with timestamps
- Logging: Built-in logging system for each task
- Task interruption: Ability to interrupt a running task
- Transactions: Support for transactional execution
- Files: Working with files (upload/download)
- Specialized classes:
BgBaseDownload— data export to ExcelBgBaseLoad— data import from Excel/ZIPBgBaseDownloadModel— Django model export
- Healthcheck: Monitoring scheduler and handler health
Advantages Over Other Solutions
- Django Integration: Uses Django ORM, migrations, and admin
- Ease of Use: Minimal code to create tasks
- Transparency: Complete execution information through admin
- Specialized Classes: Ready solutions for import/export
- Distribution: Easy scaling through adding handlers
Requirements
- Python: 3.12+
- Bazis: base package must be installed
- PostgreSQL: 12+
- Redis: for caching
- Dependencies:
openpyxl— working with Excel filespsutil— process managementcrontab— parsing cron expressionsaiohttp— asynchronous HTTP requests
Note! The current framework implementation requires Redis as the cache backend and PostgreSQL as the database.
Installation
Using uv (recommended)
uv add bazis-bg
Using pip
pip install bazis-bg
For Development
git clone git@github.com:ecofuture-tech/bazis-bg.git
cd bazis-bg
uv sync --dev
Project Setup
- Add application to Django settings:
INSTALLED_APPS = [
# ...
'bazis.contrib.bg',
# ...
]
- Apply migrations:
python manage.py migrate
- Configure environment variables:
# Scheduler
BS_BAZIS_TASK_SCHEDULER_HOST=localhost
BS_BAZIS_TASK_SCHEDULER_PORT=49001
BS_BAZIS_TASK_SCHEDULER_IDLE=0.2
# Handlers
BS_BAZIS_TASK_HANDLERS_LOCAL=5
BS_BAZIS_TASK_HANDLERS_GLOBAL=50
BS_BAZIS_TASK_HANDLER_PORT=49100
BS_BAZIS_TASK_HANDLER_IDLE=0.5
# Storage
BS_BAZIS_TASK_FOLDER=bg
Architecture
System Components
┌─────────────────┐
│ Application │ ──> delay() ──> Create Task
└─────────────────┘
│
▼
┌─────────────────┐
│ Scheduler │ ──> Monitor tasks
│ (bg_scheduler) │ ──> Start handlers
└─────────────────┘
│
▼
┌─────────────────┐
│ Handlers │ ──> Execute Task
│ (bg_handler) │ ──> Update status
└─────────────────┘
Models
- Task — main task model
- TaskCron — periodic task model
- TaskHandler — handler model
Task States
draft— Draftwaiting— Waiting to startstarting— Startingrunning— Runningdone— Completed
Package Structure
bazis.contrib.bg/
├── basic/
│ ├── base.py # Base BgBase class
│ ├── base_download.py # Data export class
│ ├── base_download_model.py # Django model export
│ └── base_load.py # Data import class
├── models_abstract.py # Abstract models
├── models.py # Concrete models
├── admin.py # Django admin
└── management/
└── commands/
├── bg_scheduler.py # Scheduler command
└── bg_handler.py # Handler command
Usage
Creating Basic Task
from bazis.contrib.bg.basic.base import BgBase
class DataProcessingTask(BgBase):
# Human-readable task name
name = 'Data Processing'
# Number of simultaneously running tasks
parallel = 2
# Tasks that block execution
blocked = ['myapp.tasks.AnotherTask']
# Execute in transaction
is_transaction = True
def pre_handle(self):
"""Called before handle()"""
self.log.info('Preliminary preparation')
def handle(self):
"""Main task logic"""
# Set phase with expectation
self.next_phase('Loading data', expected=1000)
for i in range(1000):
# Your logic
# Update progress
self.progress(
performed=i+1, # Performed
d_time=3, # Sync every 3 sec
d_count=100 # Or every 100 operations
)
# Next phase
self.next_phase('Saving results', expected=100)
# ... save logic
# Save result
self.set_result({'processed': 1000, 'saved': 100})
def post_handle(self):
"""Called after handle()"""
self.log.info('Finalization')
def excepting(self):
"""Exception handling"""
self.log.error('An error occurred')
def interrupting(self):
"""Interruption handling"""
self.log.warning('Task interrupted by user')
def finishing(self):
"""Always executed at the end"""
self.log.info('Resource cleanup')
Running Tasks
Asynchronous Launch
# Basic launch
task = DataProcessingTask.delay()
# With parameters
task = DataProcessingTask.delay(
param1='value1',
param2='value2'
)
# With custom name
task = DataProcessingTask.delay(
name='January data processing',
param1='value1'
)
# With file
with open('data.xlsx', 'rb') as fp:
task = DataProcessingTask.delay(
fp=fp,
param1='value1'
)
# With author
task = DataProcessingTask.delay(
author=request.user,
param1='value1'
)
# With environment variables
task = DataProcessingTask.delay(
envs={'MY_VAR': 'value'},
param1='value1'
)
Synchronous Launch
# Waits for task completion
task = DataProcessingTask.delay_sync(param1='value1')
Periodic Tasks
Through Django Admin
- Open "Periodic Tasks" section
- Create new task:
- Name: Task description
- Task Class: Full path (e.g.,
myapp.tasks.DataProcessingTask) - Period: Cron expression or seconds
- Args/Kwargs: JSON with parameters
- Enabled: Yes
Period Examples
# Every 60 seconds
period = "60"
# Every day at 03:00
period = "0 3 * * *"
# Every Monday at 09:00
period = "0 9 * * 1"
# Every hour
period = "0 * * * *"
# Every 5 minutes
period = "*/5 * * * *"
Data Export
Base Export Class
from bazis.contrib.bg.basic.base_download import BgBaseDownload
class ReportDownload(BgBaseDownload):
name = 'Report Export'
file_name = 'report'
# Fields to export
fields_read = ['id', 'name', 'created_at', 'status']
# Fields marked bold in header
fields_marked = {'name', 'status'}
# Column sizes
fields_size = {
'id': 10,
'name': 30,
'created_at': 20,
'status': 15
}
# Row limit per file (for splitting into archive)
limit_rows_split = 500000
def get_titles(self):
"""Returns column headers"""
return {
'id': 'ID',
'name': 'Name',
'created_at': 'Created At',
'status': 'Status'
}
def get_queryset(self):
"""Returns data for export"""
return MyModel.objects.all().values(*self.fields_read)
def get_count(self):
"""Returns number of records"""
return MyModel.objects.count()
Export with Grouped Headers
def get_titles(self):
return {
'groups': [
{
'name_union': 'Basic Information',
'columns': ['id', 'name'],
'marked': True
},
{
'name_union': 'Dates',
'columns': ['created_at', 'updated_at']
}
],
'titles': {
'id': 'ID',
'name': 'Name',
'created_at': 'Created',
'updated_at': 'Updated'
}
}
Django Model Export
from bazis.contrib.bg.basic.base_download_model import BgBaseDownloadModel
from django.apps import apps
class VehicleDownload(BgBaseDownloadModel):
model = apps.get_model('myapp.Vehicle')
fields_read = ['id', 'gnum', 'vehicle_model__brand__name', 'country__name']
# verbose_name of fields will be picked up automatically
Data Import
from bazis.contrib.bg.basic.base_load import BgBaseLoad
class DataImport(BgBaseLoad):
name = 'Data Import'
# Fields in column order
fields_read = ['name', 'price', 'quantity']
# Skip first row (headers)
skip_titles = True
# Progress update period
progress_count = 1000
# Track row grouping
groups_check = False
# Add meta-data (_meta)
with_meta = False
def load(self, reader):
"""
Process loaded data
reader - generator of dictionaries with data
"""
self.next_phase('Saving data')
for row in reader:
# row = {'name': '...', 'price': '...', 'quantity': '...'}
# Create object
MyModel.objects.create(
name=row['name'],
price=float(row['price']),
quantity=int(row['quantity'])
)
self.progress(d_count=100)
Task API
Task Management Methods
# Get task object
task = self.get_task()
# Update task data
self.task_update(
phase='New phase',
expected=1000,
performed=500
)
# Save file in task
with open('result.xlsx', 'rb') as fp:
self.task_file_save('result.xlsx', fp, file_params={'rows': 1000})
# Get file from task
fp = self.task_file_get()
# Set result
self.set_result({'status': 'success', 'count': 1000})
# Force synchronization
self.flush()
Working with Phases
# Simple phase
self.next_phase('Processing')
# Phase with expectation (number)
self.next_phase('Loading files', expected=100)
# Phase with expectation (dictionary)
self.next_phase('Processing', expected={
'files': 10,
'records': 1000
})
# Update progress
self.progress(
performed=50, # Performed
expected=100, # Expected (can be updated)
d_time=5, # Sync every 5 sec
d_count=100, # Or every 100 operations
need_sync=True # Force synchronization
)
# Progress with dictionary
self.progress(
performed={'files': 5, 'records': 500},
d_count=10
)
Logging
# Use self.log to write to task log
self.log.info('Information message')
self.log.warning('Warning')
self.log.error('Error')
self.log.debug('Debug information')
# Logs are automatically saved in task.log field
Configuration
Environment Variables
# Scheduler
BS_BAZIS_TASK_SCHEDULER_HOST=localhost # Scheduler host
BS_BAZIS_TASK_SCHEDULER_PORT=49001 # Scheduler healthcheck port
BS_BAZIS_TASK_SCHEDULER_IDLE=0.2 # Delay between iterations, sec
# Handlers
BS_BAZIS_TASK_HANDLERS_LOCAL=5 # Number of local handlers
BS_BAZIS_TASK_HANDLERS_GLOBAL=50 # Global number of handlers
BS_BAZIS_TASK_HANDLER_PORT=49100 # Base port for handlers
BS_BAZIS_TASK_HANDLER_IDLE=0.5 # Delay between task checks, sec
# Storage
BS_BAZIS_TASK_FOLDER=bg # Folder for task files
BS_BAZIS_STORAGE_BG=None # Storage class (default FileSystemStorage)
Django Settings
# settings.py
# Number of local handlers
BAZIS_TASK_HANDLERS_LOCAL = 3
# Global number of handlers
BAZIS_TASK_HANDLERS_GLOBAL = 50
# Base port for handlers
BAZIS_TASK_HANDLER_PORT = 8100
# Scheduler task check interval (seconds)
BAZIS_TASK_SCHEDULER_IDLE = 1
# Handler task check interval (seconds)
BAZIS_TASK_HANDLER_IDLE = 1
# Custom storage for task files
BAZIS_STORAGE_BG = 'myapp.storage.S3Storage'
Task Management
Through Django Admin
-
Background Tasks (
Task)- View all tasks
- Filter by state, date
- Interrupt running tasks
- View logs and results
- Download result files
-
Periodic Tasks (
TaskCron)- Create/edit periodic tasks
- Enable/disable
- View next run time
-
Handlers (
TaskHandler)- View active handlers
- Monitor load
Programmatically
from django.apps import apps
Task = apps.get_model('bg.Task')
# Get task
task = Task.objects.get(id=task_id)
# Interrupt task
task.interrupt = True
task.save()
# Check state
if task.is_done:
if task.is_success:
print('Successfully completed')
elif task.is_error:
print(f'Error: {task.error}')
elif task.is_interrupt:
print('Interrupted')
# Get result
result = task.result
# Download file
if task.file:
with task.file.open('rb') as fp:
data = fp.read()
Running Services
# Start scheduler
python manage.py bg_scheduler
# Start handler manually
python manage.py bg_handler --host 127.0.0.1 --port 8100
# In production, it's recommended to use supervisor or systemd
Examples
Example 1: Simple Task with Progress
from bazis.contrib.bg.basic.base import BgBase
class SimpleTask(BgBase):
name = 'Simple Task'
def handle(self):
self.next_phase('Processing', expected=100)
for i in range(100):
# Your logic
self.log.info(f'Processing item {i+1}')
self.progress(d_count=10)
# Launch
task = SimpleTask.delay()
Example 2: Task with File
from bazis.contrib.bg.basic.base import BgBase
class FileProcessingTask(BgBase):
name = 'File Processing'
def handle(self):
# Get file
fp = self.task_file_get()
if not fp:
raise Exception('File not provided')
self.next_phase('Reading file')
data = fp.read()
# Processing...
result = self.process_data(data)
# Save result
from io import BytesIO
result_fp = BytesIO(result)
self.task_file_save('result.txt', result_fp)
def process_data(self, data):
# Your processing logic
return data.upper()
# Launch with file
with open('input.txt', 'rb') as fp:
task = FileProcessingTask.delay(fp=fp)
Example 3: Report Export
from bazis.contrib.bg.basic.base_download_model import BgBaseDownloadModel
from django.apps import apps
class MonthlyReport(BgBaseDownloadModel):
model = apps.get_model('myapp.Order')
fields_read = ['id', 'number', 'customer__name', 'total', 'created_at']
fields_marked = {'total'}
file_name = 'monthly_report'
# Launch
task = MonthlyReport.delay()
# Get file after completion
from django.apps import apps
Task = apps.get_model('bg.Task')
task = Task.objects.get(id=task.id)
if task.is_success:
file_url = task.file.url
Example 4: Data Import
from bazis.contrib.bg.basic.base_load import BgBaseLoad
from django.apps import apps
class ProductImport(BgBaseLoad):
name = 'Product Import'
fields_read = ['name', 'sku', 'price', 'quantity']
skip_titles = True
def load(self, reader):
Product = apps.get_model('myapp.Product')
self.next_phase('Importing products')
created = 0
updated = 0
for row in reader:
product, is_created = Product.objects.update_or_create(
sku=row['sku'],
defaults={
'name': row['name'],
'price': row['price'],
'quantity': row['quantity']
}
)
if is_created:
created += 1
else:
updated += 1
self.progress(d_count=100)
self.set_result({'created': created, 'updated': updated})
# Launch with file
with open('products.xlsx', 'rb') as fp:
task = ProductImport.delay(fp=fp)
Example 5: Task with Parameters
from bazis.contrib.bg.basic.base import BgBase
from django.apps import apps
class SendEmailTask(BgBase):
name = 'Send Email'
parallel = 5 # Maximum 5 simultaneous sends
def handle(self, email_list, subject, message):
"""
Args:
email_list: list of email addresses
subject: email subject
message: email body
"""
self.next_phase('Sending emails', expected=len(email_list))
for i, email in enumerate(email_list):
try:
# Send email
self.send_email(email, subject, message)
self.log.info(f'Sent to {email}')
except Exception as e:
self.log.error(f'Error sending to {email}: {e}')
self.progress(performed=i+1, d_count=10)
def send_email(self, email, subject, message):
from django.core.mail import send_mail
send_mail(subject, message, 'noreply@example.com', [email])
# Launch
emails = ['user1@example.com', 'user2@example.com']
task = SendEmailTask.delay(
email_list=emails,
subject='Important notification',
message='Message text'
)
Example 6: Task with Locks
from bazis.contrib.bg.basic.base import BgBase
class DataExportTask(BgBase):
name = 'Data Export'
parallel = 1 # Only one task simultaneously
blocked = [
'myapp.tasks.DataImportTask', # Blocked by import
'myapp.tasks.DataUpdateTask' # Blocked by update
]
def handle(self):
self.next_phase('Exporting data')
# Export logic
pass
class DataImportTask(BgBase):
name = 'Data Import'
parallel = 1
blocked = [
'myapp.tasks.DataExportTask',
'myapp.tasks.DataUpdateTask'
]
def handle(self):
self.next_phase('Importing data')
# Import logic
pass
Development
Setting Up Development Environment
# Clone repository
git clone git@github.com:ecofuture-tech/bazis-bg.git
cd bazis-bg
# Install dependencies
uv sync --dev
# Run tests
pytest
# Check code
ruff check .
# Format code
ruff format .
Test Structure
tests/
├── test_base.py # Base class tests
├── test_download.py # Export tests
├── test_load.py # Import tests
├── test_scheduler.py # Scheduler tests
└── test_handler.py # Handler tests
Contributing
We welcome your contributions! Here's how you can help:
- Fork the repository
- Create a branch for new functionality (
git checkout -b feature/amazing-feature) - Make changes
- Run tests (
pytest) - Commit changes (
git commit -m 'Add amazing feature') - Push to branch (
git push origin feature/amazing-feature) - Open Pull Request
Please ensure that:
- Tests are written for new functionality
- Documentation is updated
- Existing code style is maintained
- Changes are added to changelog
License
Apache License 2.0
See LICENSE file for details.
Links
- Bazis Core — framework base package
- Issue Tracker — report bug or request feature
- Bazis Documentation — main documentation
Support
If you have questions or problems:
- Check documentation
- Search in existing issues
- Create new issue with detailed description
Package Ecosystem
Bazis supports extensions through additional packages:
bazis— framework corebazis-bg— background task systembazis-test-utils— testing utilitiesbazis-<n>— other extensions (addbazis-prefix to name)
All extension packages require installation of the base bazis package.
Made with ❤️ by Bazis team
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file bazis_bg-2.2.1.tar.gz.
File metadata
- Download URL: bazis_bg-2.2.1.tar.gz
- Upload date:
- Size: 129.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.8 {"installer":{"name":"uv","version":"0.10.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b4c1243f113cbc424adc66b7380f1fe459fec74a56c4eb9784a529cfe8350d2c
|
|
| MD5 |
86d8bfc43f9a93721a8f81fd959debe5
|
|
| BLAKE2b-256 |
d67839728eefca85e4000e409d23d462d970224bc411b6959602bd6a6a650a1f
|
File details
Details for the file bazis_bg-2.2.1-py3-none-any.whl.
File metadata
- Download URL: bazis_bg-2.2.1-py3-none-any.whl
- Upload date:
- Size: 82.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.8 {"installer":{"name":"uv","version":"0.10.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5fd0038fb179bd43df21b6556dd4748796892c5f8f6c352d4878710b2c624b94
|
|
| MD5 |
c61e02da0eb13d6e4ca63ae3737b75e2
|
|
| BLAKE2b-256 |
e9caadaa2716b920288025bdf774beacf4a29c0045628ee19cc98f79a0d30bfb
|