Skip to main content

Async tasks made simple—multi-threaded, stoppable, and timeout-ready.

Project description

async-tasks

Latest version on PyPI Compatible Python versions.

A Python library for managing asynchronous tasks—supporting multi-threading, task cancellation, and timeout handling.

Key Features

  • Timeout Handling: Automatically stops tasks that exceed a specified time limit.
  • Task Cancellation: Provides the ability to stop or cancel tasks at any point during execution.
  • Multi-threading: Runs tasks asynchronously in separate threads to avoid blocking the main thread.

Installation

To install async-tasks, simply run:

pip install async-tasks-python

Usage Example

Basic Example: Running and Stopping a Task

import time
from async_tasks import AsyncTask

# Define a long-running task
def long_running_task():
    for i in range(10):
        time.sleep(1)  # Simulate work with a 1-second delay
        print(f"Task running... {i + 1}")
    return "Task completed successfully"

# Run the task asynchronously in a separate thread
task = AsyncTask.run_async(long_running_task, task_id="task1")

# Stop the task after 3 seconds
time.sleep(3)
task.stop()

# Alternatively, cancel the task using its task ID
AsyncTask.cancel("task1")

Example: Timeout and Callback

import time
from async_tasks import AsyncTask

# Define a long-running task
def long_running_task():
    for i in range(10):
        time.sleep(1)  # Simulate work with a 1-second delay
        print(f"Task running... {i + 1}")
    return "Task completed successfully"

# Define a callback function to handle task completion
def on_completion(result, error):
    print("\n✅ Task finished")
    if error:
        print(f"Error: {error}")
    else:
        print(f"Result: {result}")

# Run the task with a timeout (3 seconds)
# If the task exceeds this time, it will be automatically stopped
AsyncTask.run_async(
    long_running_task,
    timeout=3,  # Task will stop if it takes longer than 3 seconds
    on_completion=on_completion,  # Callback function on task completion
)

# Allow some time for the task to run and timeout
time.sleep(5)

Example: Managing a Thread Pool with AsyncTasksExecutor

You can manage a pool of asynchronous tasks in a manner similar to using ThreadPoolExecutor. This allows you to control the number of concurrent tasks and efficiently manage resources. In this example, we will submit multiple tasks to the executor, track their progress, and handle task completion.

from queue import Queue
from time import sleep

import tqdm

from async_tasks.executor import AsyncTasksExecutor

# Number of concurrent workers
max_workers = 5

# Initialize result storage and result queue
results = []
result_queue = Queue()


def process_task(idx: int) -> None:
    try:
        sleep(1)  # Simulate work with a 1-second delay
        if idx % 2:  # Simulate failure for tasks with odd indices
            raise Exception(f"Task {idx + 1} failed")

        # If the task succeeds, put the result in the queue
        result_queue.put((None, f"Task {idx + 1} completed successfully"))
    except Exception as err:
        result_queue.put((err, None))  # In case of failure, put the error in the queue


# Create an AsyncTasksExecutor to manage a pool of concurrent threads
with AsyncTasksExecutor(max_workers=max_workers) as executor:
    # Submit tasks to the executor
    for idx in range(10):
        executor.submit(process_task, idx)

    # Track and display progress using tqdm for a progress bar
    completed = 0
    with tqdm.tqdm(total=10, desc="Processing Tasks") as process_bar:
        try:
            while completed < 10:
                # Retrieve the result of a completed task from the queue
                res = result_queue.get()
                results.append(res)
                completed += 1
                process_bar.update(1)  # Update the progress bar
        except KeyboardInterrupt:
            # Gracefully handle KeyboardInterrupt and shut down the executor
            print("\nProcess interrupted. Shutting down executor...")
            executor.shutdown()

# Optionally, print the results
print("\nAll tasks completed:")
for result in results:
    print(result)

License

This project is licensed under the MIT License - see the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

async_tasks_python-1.0.0.tar.gz (5.0 kB view details)

Uploaded Source

Built Distribution

async_tasks_python-1.0.0-py3-none-any.whl (6.1 kB view details)

Uploaded Python 3

File details

Details for the file async_tasks_python-1.0.0.tar.gz.

File metadata

  • Download URL: async_tasks_python-1.0.0.tar.gz
  • Upload date:
  • Size: 5.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.4 CPython/3.13.0 Darwin/23.6.0

File hashes

Hashes for async_tasks_python-1.0.0.tar.gz
Algorithm Hash digest
SHA256 85713376a07df102c4cd1b1d8c93b04ad0b0cd685051d3a66bc35edab106427d
MD5 37d41ea628dbc2c59e10baf8831fc3e5
BLAKE2b-256 e627fe107097e484da81e90d0796a4ec3805a1506f5d1d4f2bd43f0679b0bc72

See more details on using hashes here.

File details

Details for the file async_tasks_python-1.0.0-py3-none-any.whl.

File metadata

File hashes

Hashes for async_tasks_python-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 e52dddee63911caf58aae524367401637f8431d544a83af679e0e829ed84019e
MD5 751b6d0b92e5f4b9c9b7dd207289042a
BLAKE2b-256 4275f2ea964f3f25f48834b28f323244800eac23f069f41b0c93560d943ef8cf

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page