Skip to main content

Monorepo for bergtalzug

Project description

Bergtalzug

Bergtalzug is a flexible ETL (Extract, Transform, Load) framework that processes data concurrently and efficiently in Python. It utilizes multiple worker pools and Culsans queues (best async Python queue out there!) to handle data processing pipelines. The main use case is when data needs to be processed efficiently and in a distributed manner but doesn't warrant the creation of more complex setups using Spark or Apache Airflow, or if you want a tool that's easier to run in native Python.

Architecture

Bergtalzug implements a dynamic multi-stage pipeline architecture where you can define any number of processing stages, each with its own:

  • Execution model: Async, ThreadPoolExecutor, ProcessPoolExecutor, or InterpreterPoolExecutor (Python 3.14+)
  • Worker count: Configure concurrency per stage independently
  • Queue size: Control backpressure with per-stage queue limits
  • Custom logic: Each stage is a function you define

Pipeline Flow

Data flows through stages as WorkItem objects. Each stage has its own pool of workers:

                    Stage1      Stage2      Stage3      Stage4
                    Stage1      Stage2      Stage3      Stage4
refill_queue()  ->  Stage1  ->  Stage2  ->  Stage3  ->  Stage4
                    Stage1      Stage2      Stage3      Stage4
                    Stage1      Stage2      Stage3      Stage4

Each WorkItem goes through the entire pipeline sequentially before being marked as "completed". Items are retrieved by implementing the refill_queue() method, which the pipeline calls automatically when the first stage's queue needs refilling. The size of each queue that feeds into each stage is defined in the Stage configuration you create.

Benefits of Multi-Stage Design

The multi-stage architecture with separate worker pools provides several advantages:

  1. Buffering: Data can be buffered between stages, allowing fast stages to continue working even when slower stages are backed up
  2. Resource Optimization: I/O-heavy stages can use async workers, while CPU-heavy stages can use process pools
  3. Independent Scaling: Each stage can be scaled independently based on its specific bottlenecks
  4. Better Throughput: Workers in one stage can start on the next item immediately after finishing, without waiting for the entire pipeline

Execution Types

Bergtalzug supports four execution models for maximum flexibility:

  • ASYNC: Native asyncio for I/O-bound operations (API calls, database queries, file operations)
  • THREAD: ThreadPoolExecutor for blocking I/O operations that can't use async
  • PROCESS: ProcessPoolExecutor for CPU-intensive operations that need true parallelism
  • INTERPRETER: InterpreterPoolExecutor for CPU-intensive operations with true multi-core parallelism (Python 3.14+ only)

You should only use ThreadPoolExecutor if for some reason async is not an option - e.g. the code you are executing releases the GIL but doesn't have an async interface. If your code doesn't release the GIL you can use either the InterpreterPoolExecutor or the ProcessPoolExecutor.

New in Python 3.14, InterpreterPoolExecutor provides true multi-core parallelism similar to ProcessPoolExecutor, but with each worker running in its own isolated interpreter instead of a separate process. Each interpreter has its own GIL, enabling genuine multi-core CPU utilization by spawning a new thread for each interpreter. Each interpreter is completely isolated - separate module imports, separate sys.stdout, etc. One of the biggest benefits is that InterpreterPoolExecutor supports zero-copy operations on various non-mutables types, e.g. bytes, tuple etc.

In general (assuming you use this framework for data processing) InterpreterPoolExecutor will always be a better pick than ProcessPoolExecutor since data can be passed without being pickled (avoid serialization overhead) and without holding more memory than required.

You can review more information on the different type of executors here:

Usage Examples

One use case is downloading data (async operation), process it (CPU heavy operation) and store it (async operation). Now depending on what you are doing the pipeline could be fully async (e.g. the library/code you use to process the data supports an async interface) or a mix of async and sync (InterpreterPoolExecutor or ProcessPoolExecutor if interpreters can't be used).

Example 1

Task: Download data, validate it with DuckDB and store it in S3. This could be a full async pipeline with 3 stages since there is an async DuckDB library. So why not just have "1 big stage", what is even the point of using a pipeline here? Even if you use DuckDBs async interface it would still spawn real DuckDB instances in the background which utilize your CPU cores.

If you want to download 100 files at the same time and have each async task run in one streamlined process (download, process, upload) replicated 100 times, this would spawn 100 DuckDB instances which wouldn't be efficient at all and would only cause slow downs in processing. On the other hand you can spawn 100 download tasks, 20 processing (utilizing all of your cores) tasks and 100 upload tasks. If the files are big the download would take some time (Network I/O) but the processing in DuckDB could be quite fast hence you are doing all 3 tasks (download, process and upload) as efficient as possible.

Example 2

Task: Download data, perform some calculations using NumPy and save it to a file.

The first and last stages obviously should be async but the middle one is a bit more difficult. NumPy does release the GIL for most operations but not for all. At the same time it does not provide an async interface so we now have to decide between threads, processes or interpreters. If the workload you have releases the GIL - use the ThreadPoolExecutor since it's easier to work with them (passing arguments is easier, no serialization etc.) and the underlying C library behind NumPy will utilize all cores anyway.

If your workload doesn't release the GIL use the InterpreterPoolExecutor since this way you can run tasks that don't release the GIL concurrently without the serialization and copy overhead of the ProcessPoolExecutor. If for some reason you can't use interpreters - use processes.

Note: Of course you can add as many stages behind each other as you like but keep in mind that this adds overhead. It's advisable to complete the work in as little stages as possible.

Quick Start

1. Define Your Pipeline Configuration

Create a configuration specifying your stages:

from bergtalzug import ETLPipelineConfig, StageConfig, ExecutionType

config = ETLPipelineConfig(
    pipeline_name="my_pipeline",
    stages=[
        StageConfig(
            name="fetch",
            execution_type=ExecutionType.ASYNC,
            workers=10,
            queue_size=100, # size of the queue that feeds this stage - how many items can wait before being processed in this stage
        ),
        StageConfig(
            name="parse",
            execution_type=ExecutionType.THREAD,
            workers=5,
            queue_size=50,
        ),
        StageConfig(
            name="compute",
            execution_type=ExecutionType.PROCESS,
            workers=4,
            queue_size=20,
        ),
        StageConfig(
            name="store",
            execution_type=ExecutionType.ASYNC,
            workers=10,
            queue_size=100,
        ),
    ],
    queue_refresh_rate=1.0,  # Check first queue every second and call refill_queue()
    stats_interval_seconds=10.0,  # Report stats every 10 seconds
    enable_tracking=True,  # Track items and collect results
)

Note: The names created, completed and error are reserved and can't be used as Stage names.

2. Create Your Pipeline Class

Inherit from ETLPipeline and implement refill_queue():

from bergtalzug import ETLPipeline, WorkItem

class MyPipeline(ETLPipeline):
    def __init__(self, config: ETLPipelineConfig):
        super().__init__(config)
        self.items_to_process = 100
        self.items_generated = 0

    async def refill_queue(self, count: int) -> list[WorkItem]:
        """
        Generate work items to feed into the pipeline.

        Args:
            count: Number of items requested by the pipeline

        Returns:
            List of WorkItems, or empty list to signal completion
        """
        items = []
        for _ in range(count):
            if self.items_generated >= self.items_to_process:
                break

            items.append(WorkItem(data={"id": self.items_generated, "url": f"https://api.example.com/item/{self.items_generated}"}))
            self.items_generated += 1

        return items  # Return empty list when no more work

3. Define Stage Handler Functions

Create a handler function for each stage. Important: The function signature depends on the execution type!

For ASYNC and THREAD stages:

# ASYNC handler - use async def
async def fetch_data(item: WorkItem) -> WorkItem:
    """Fetch data from API"""
    import aiohttp
    async with aiohttp.ClientSession() as session:
        async with session.get(item.data["url"]) as resp:
            item.data["raw_data"] = await resp.json()
    return item

# THREAD handler - use regular def
def parse_data(item: WorkItem) -> WorkItem:
    """Parse and validate data (blocking operation)"""
    import json
    parsed = json.loads(item.data["raw_data"])
    item.data["parsed"] = parsed
    return item

⚠️ For PROCESS and INTERPRETER stages:

CRITICAL: ProcessPoolExecutor and InterpreterPoolExecutor have different function signatures! This is due to a limitation where the WorkItem class can't be pickled and passed to these executors. Instead the job_id and data are passed separately, as a side-effect you do not have access to WorkItem.metadata - the metadata could potentially be passed as an optional argument if a flag is set in the config, please open an issue if this is something you think is needed.

If you get an args not shareable error it is very likely that you didn't import one of the dependencies you using in your function. E.g. if you use time.sleep() you need to call import time inside the function not inside the module.

# PROCESS handler example
def compute_data(job_id: str, data: Any) -> Any:
    """
    CPU-intensive computation using ProcessPoolExecutor.

    - Takes job_id and data as separate parameters (not WorkItem!)
    - Returns only the modified data (not WorkItem!)
    - Must be a module-level function (not a method or lambda since they can't be pickled)
    """
    logging.info(f"Processing {job_id}")

    # Do CPU-intensive work
    result = math.sqrt(data["value"])
    data["computed"] = result

    return data  # Return data only, not WorkItem

# INTERPRETER handler example (Python 3.14+)
def enrich_data(job_id: str, data: Any) -> Any:
    """
    CPU-intensive computation using InterpreterPoolExecutor.

    - Takes job_id and data as separate parameters (not WorkItem!)
    - Each interpreter is completely isolated
    - ALL imports must be inside the function
    """
    import math  # Must import inside - each interpreter is separate

    print(f"[INTERPRETER] Enriching {job_id}")

    # Do CPU-intensive work
    result = math.sqrt(data["value"]) * 2
    data["enriched"] = result

    return data  # Return data only, not WorkItem

ASYNC handler for final stage:

async def store_data(item: WorkItem) -> WorkItem:
    """Store results asynchronously"""
    # Save to database, upload to S3, etc.
    await save_to_database(item.data)
    return item

4. Register Handlers and Run

import asyncio

async def main():
    # Create pipeline
    pipeline = MyPipeline(config)

    # Register handler for each stage
    pipeline.register_stage_handler("fetch", fetch_data)
    pipeline.register_stage_handler("parse", parse_data)
    pipeline.register_stage_handler("compute", compute_data)
    pipeline.register_stage_handler("store", store_data)

    # Optional: Add completion callback
    def on_complete(result):
        if result.success:
            print(f"✓ Item {result.job_id} completed in {result.total_duration:.2f}s")
        else:
            print(f"✗ Item {result.job_id} failed: {result.error}")

    pipeline.add_completion_callback(on_complete)

    # Run the pipeline (blocks until completion)
    results = await pipeline.run()

    # You can also start it in the background
    await pipeline.start()
    # Do some other actions...
    # And await until it's done
    results = await pipeline.run()

    # Analyze results
    successful = sum(1 for r in results if r.success)
    print(f"Processed {len(results)} items, {successful} successful")

if __name__ == "__main__":
    asyncio.run(main())

Important Warnings and Best Practices

Function Signatures

The function signature differs based on execution type:

Execution Type Function Signature Returns
ASYNC async def handler(item: WorkItem) -> WorkItem WorkItem
THREAD def handler(item: WorkItem) -> WorkItem WorkItem
PROCESS def handler(job_id: str, data: Any) -> Any Any (data only)
INTERPRETER def handler(job_id: str, data: Any) -> Any Any (data only)

The data returned by PROCESS and INTERPRETER executors will be packaged once again into the same WorkItem object.

Why the difference? ProcessPoolExecutor and InterpreterPoolExecutor require picklable data. The WorkItem object with all its metadata cannot be efficiently pickled, so we extract job_id and data before passing to these executors.

Resource Considerations

  • Queue Sizes: Larger queues buffer more work but use more memory
  • Worker Counts:
    • ASYNC: Can be high (50-100+) for I/O-bound work
    • THREAD: Depends on your usecase, see examples
    • PROCESS: Should match CPU cores (use os.cpu_count())
    • INTERPRETER: Should match CPU cores, similar to PROCESS
  • Memory: Each worker & queue holds items in memory; monitor usage

Error Handling

  • Pipeline continues processing on errors - there is currently no DLQ (Dead-letter queue) or retry mechanism implemented.
  • Failed items are tracked separately in results
  • Use enable_tracking=True to get detailed error information
  • Check result.success and result.error for each item

Monitoring and Statistics

The pipeline logs messages by itself however if you don't want this you can disable it by setting stats_interval_seconds in ETLPipelineConfig to 0 and check the pipeline statistics yourself using the the get_pipeline_stats() function.

# Non-blocking start
await pipeline.start()

# Monitor while running
while pipeline.is_running():
    stats = await pipeline.get_pipeline_stats()
    print(f"Active: {stats['active_items']}, Completed: {stats['completed_items']}")
    print(f"Success rate: {stats['success_rate']:.2%}")
    await asyncio.sleep(1)

# Wait for completion
results = await pipeline.run()

After the pipeline finished you check the results for durations and other data for each stage.

for result in results:
    print(f"Job {result.job_id}:")
    print(f"  Total: {result.total_duration:.2f}s")
    for stage_name in ["fetch", "parse", "compute", "store"]:
        duration = result.metadata.get_stage_duration(stage_name)
        if duration:
            print(f"  {stage_name}: {duration:.2f}s")

Additionally each WorkItem has a metadata in the form of a WorkItemMetadata object. You can access it during runtime to check each workitems status, it's a good idea to keep a reference to workitems after you submit them in your refill_queue() function in case you want to monitor them.

Complete Example

See example.py for a complete working example demonstrating all execution types and features.

Docker Support

An example Dockerfile is provided showing how to use Bergtalzug's base Docker image.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

bergtalzug-0.2.0.tar.gz (15.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

bergtalzug-0.2.0-py3-none-any.whl (16.1 kB view details)

Uploaded Python 3

File details

Details for the file bergtalzug-0.2.0.tar.gz.

File metadata

  • Download URL: bergtalzug-0.2.0.tar.gz
  • Upload date:
  • Size: 15.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.8.5

File hashes

Hashes for bergtalzug-0.2.0.tar.gz
Algorithm Hash digest
SHA256 77de46f539064e5926bb3d576ff6dc8bbaa1d1a6978d0a9a344e15493efcee0c
MD5 212fe8a12e7dca082e415e29b49c598d
BLAKE2b-256 1d1bc1f63249e0395d2a9066d0d5e0ec537fb2d60dd0077f0d47796a12f2922b

See more details on using hashes here.

File details

Details for the file bergtalzug-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: bergtalzug-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 16.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.8.5

File hashes

Hashes for bergtalzug-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 70f210a19cf683ad2664a872a901a2bef927d124dbc8fe078d6967b34f60172a
MD5 7b09838901d19978451954d48c73fa1c
BLAKE2b-256 56ca77a11cef34ef296a9656686103307afd41e474033761cb3bac9039fdc7f6

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page