Skip to main content

A Python library for creating flexible, chainable, and prioritized processing pipelines.

Project description

Lane2Lane

Lane2Lane is a Python library for creating flexible, chainable, and prioritized processing pipelines. It allows you to define sequential processing steps (lanes) that can be executed in a specific order with dependency relationships.

For detailed documentation, check out our Wiki.

Installation

pip install lane2lane
pip install git+https://github.com/Talisik/lane2lane.git

Requirements

  • Python 3.8+
  • fun-things
  • simple-chalk

Quick Start

from l2l import Lane, PrimaryLane

# Define a simple processing lane
class ProcessingLane(Lane):
    def process(self, value):
        processed_value = f"{value} - processed"
        yield processed_value

# Define a primary lane that uses the processing lane
class Main(PrimaryLane):
    lanes = {
        -10: ProcessingLane,  # Run ProcessingLane before this lane
    }

    def process(self, value):
        result = f"{value} - main"
        yield result

# Run the pipeline
results = Lane.start("MAIN")

# Process the results
for result in results:
    print(result)

Concepts

Lanes

A Lane is a processing unit that can transform or act on data. Lanes can be:

  • Primary Lanes: Entry points that can be directly executed
  • Regular Lanes: Processing stages that run as part of a lane chain

Lane Ordering

Lanes are executed in a specific order defined by:

  • Priority: Integer values that determine execution order
  • Before/After Relationships: Negative priorities run before, positive priorities run after

Basic Usage

Creating a Lane

from l2l import Lane

class MyLane(Lane):
    # Process data and optionally yield results
    def process(self, value):
        processed_value = do_something(value)
        yield processed_value

Creating a Primary Lane

Primary lanes are entry points for execution:

from l2l import PrimaryLane

class MyPrimaryLane(PrimaryLane):
    def process(self, value):
        # Process the input value
        result = transform_data(value)
        yield result

You can also override the 'primary' class method in a Lane class:

from l2l import Lane

class MyPrimaryLane(Lane):
    @classmethod
    def primary(cls) -> bool:
        return True  # This makes it a primary lane

    def process(self, value):
        # Process the input value
        result = transform_data(value)
        yield result

Defining Lane Order

Lanes can specify other lanes to run before and after them:

class MainLane(PrimaryLane):
    # Define lanes to run before and after this lane
    lanes = {
        -10: "PreprocessLane",   # Run PreprocessLane before this lane (higher negative priority runs first)
        -5: ValidationLane,      # Run ValidationLane after PreprocessLane but before this lane
        0: PostProcessLane,      # Run PostProcessLane after this lane
        10: CleanupLane,         # Run CleanupLane after PostProcessLane
        20: None,                # Use None to remove a lane at this priority
    }

    def process(self, value):
        # Process after PreprocessLane and ValidationLane
        # but before PostProcessLane and CleanupLane
        return transform_data(value)

The priority numbers determine the execution order:

  • Negative priorities: Lanes that run before this lane (more negative runs first)
  • Positive priorities: Lanes that run after this lane (higher positive runs first)

Running Lanes

# Start a specific primary lane
result = Lane.start("MAIN_LANE")

# Start all primary lanes that match a name
results = [*Lane.start("MAIN")]

Subscriber Example

Subscriber is a pre-defined lane class that provides a standard way to generate data. Rather than processing input from previous lanes, Subscriber lanes generate their own payloads:

from l2l import Subscriber

class DataSourceLane(Subscriber):
    def get_payloads(self, value):
        # Fetch data from some source
        data = fetch_data_from_source()
        for item in data:
            yield item

Instead of implementing process(), you only need to implement get_payloads() to define where your data comes from. The Subscriber class handles the rest automatically.

Advanced Features

Conditional Execution

Lanes can have conditions for execution:

class ConditionalLane(Lane):
    @classmethod
    def condition(cls, name: str):
        # Only run this lane if the name contains "SPECIAL"
        return "SPECIAL" in name

Custom Naming

Provide custom names or aliases for lanes:

class CustomNamedLane(Lane):
    @classmethod
    def name(cls) -> Iterable[str]:
        yield "CUSTOM_PROCESS"
        yield "PROCESSOR"  # An alias

Maximum Run Count

Limit how many times a lane can run:

class OneTimeLane(Lane):
    @classmethod
    def max_run_count(cls) -> int:
        return 1  # Run this lane only once

Process All Values

Control whether all items should be processed before passing to the next lane:

class BatchProcessingLane(Lane):
    process_all = True  # Process all items before passing to the next lane

    def process(self, value):
        # When process_all is True, all items will be processed by this lane
        # before any are passed to subsequent lanes
        yield processed_value

When process_all is False (default), each item is processed through the entire lane chain before the next item starts processing.

Terminating Lane Execution

You can manually terminate a lane's execution:

class TerminatingLane(Lane):
    def process(self, value):
        if some_condition:
            self.terminate()  # Stop processing this lane
            return

        yield processed_value

Multiprocessing Support

Lane2Lane supports multiprocessing for parallel data processing:

class ParallelProcessingLane(Lane):
    multiprocessing = True  # Enable multiprocessing for this lane

    def process(self, value):
        # Process data in parallel
        # Each yielded item will be processed by subsequent lanes
        yield processed_item

Error Handling

Lanes provide built-in error handling capabilities:

class ErrorHandlingLane(Lane):
    @classmethod
    def terminate_on_error(cls):
        return True  # Stop processing on error (default behavior)

    def process(self, value):
        try:
            # Process data
            yield processed_data
        except Exception as e:
            # Access errors with self.errors
            # Global errors available via Lane.global_errors()
            pass

Complete Example

Here's a complete example showing a data processing pipeline:

from l2l import Lane, PrimaryLane, Subscriber

# Data source that fetches records
class DataSourceLane(Subscriber):
    def get_payloads(self, value):
        data = [
            {"id": 1, "name": "Alice", "score": 85},
            {"id": 2, "name": "Bob", "score": 92},
            {"id": 3, "name": "Charlie", "score": 78},
        ]
        for item in data:
            yield item

# Validation lane
class ValidationLane(Lane):
    def process(self, value):
        if "id" not in value or "name" not in value:
            raise ValueError(f"Invalid data format: {value}")
        yield value

# Processing lane
class ScoreProcessingLane(Lane):
    def process(self, value):
        # Add grade based on score
        if "score" in value:
            if value["score"] >= 90:
                value["grade"] = "A"
            elif value["score"] >= 80:
                value["grade"] = "B"
            elif value["score"] >= 70:
                value["grade"] = "C"
            else:
                value["grade"] = "D"
        yield value

# Output formatting lane
class FormattingLane(Lane):
    def process(self, value):
        yield f"Student {value['name']} (ID: {value['id']}) - Score: {value['score']}, Grade: {value.get('grade', 'N/A')}"

# Main primary lane that orchestrates the pipeline
class StudentProcessingLane(PrimaryLane):
    lanes = {
        -30: DataSourceLane,       # First fetch the data
        -20: ValidationLane,       # Then validate it
        -10: ScoreProcessingLane,  # Then process scores
        0: FormattingLane,         # Finally format for output
    }

    # Note: No need to implement process() if you're just passing values through
    # The Lane class already handles this behavior by default

# Run the pipeline
results = Lane.start("STUDENT_PROCESSING")
for result in results:
    print(result)

Output:

Student Alice (ID: 1) - Score: 85, Grade: B
Student Bob (ID: 2) - Score: 92, Grade: A
Student Charlie (ID: 3) - Score: 78, Grade: C

License

MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

lane2lane-1.9.2.tar.gz (17.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

lane2lane-1.9.2-py3-none-any.whl (15.9 kB view details)

Uploaded Python 3

File details

Details for the file lane2lane-1.9.2.tar.gz.

File metadata

  • Download URL: lane2lane-1.9.2.tar.gz
  • Upload date:
  • Size: 17.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.10.17

File hashes

Hashes for lane2lane-1.9.2.tar.gz
Algorithm Hash digest
SHA256 4019835ff2202cd47bbd2372cf164dee4f1640d5df96a3809a189773e63042f7
MD5 29cc9f8a8f6d1a4c1f83420a864aa8e5
BLAKE2b-256 f81ca591912375a7ddb74075aef046ed779cca924ec15bd13388844779c5f528

See more details on using hashes here.

File details

Details for the file lane2lane-1.9.2-py3-none-any.whl.

File metadata

  • Download URL: lane2lane-1.9.2-py3-none-any.whl
  • Upload date:
  • Size: 15.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.10.17

File hashes

Hashes for lane2lane-1.9.2-py3-none-any.whl
Algorithm Hash digest
SHA256 d1e77748ff24d041daaced34d60b04814b361430a65ff128921d69e6899397ea
MD5 d49fa89af5fcd43feccf066bd0fed5f1
BLAKE2b-256 4162b3ba593ec68515b50e9ecf81daea80558ee29dad7cdda3d39ca9d4323587

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page