Skip to main content

A Python library for creating flexible, chainable, and prioritized processing pipelines.

Project description

Lane2Lane

Lane2Lane is a Python library for creating flexible, chainable, and prioritized processing pipelines. It allows you to define sequential processing steps (lanes) that can be executed in a specific order with dependency relationships.

For detailed documentation, check out our Wiki.

Installation

pip install git+https://github.com/Talisik/lane2lane.git

Requirements

  • Python 3.8+
  • fun-things
  • simple-chalk

Quick Start

from l2l import Lane, PrimaryLane

# Define a simple processing lane
class ProcessingLane(Lane):
    def process(self, value):
        processed_value = f"{value} - processed"
        yield processed_value

# Define a primary lane that uses the processing lane
class Main(PrimaryLane):
    lanes = {
        -10: ProcessingLane,  # Run ProcessingLane before this lane
    }

    def process(self, value):
        result = f"{value} - main"
        yield result

# Run the pipeline
results = Lane.start("MAIN")

# Process the results
for result in results:
    print(result)

Concepts

Lanes

A Lane is a processing unit that can transform or act on data. Lanes can be:

  • Primary Lanes: Entry points that can be directly executed
  • Regular Lanes: Processing stages that run as part of a lane chain

Lane Ordering

Lanes are executed in a specific order defined by:

  • Priority: Integer values that determine execution order
  • Before/After Relationships: Negative priorities run before, positive priorities run after

Basic Usage

Creating a Lane

from l2l import Lane

class MyLane(Lane):
    # Process data and optionally yield results
    def process(self, value):
        processed_value = do_something(value)
        yield processed_value

Creating a Primary Lane

Primary lanes are entry points for execution:

from l2l import PrimaryLane

class MyPrimaryLane(PrimaryLane):
    def process(self, value):
        # Process the input value
        result = transform_data(value)
        yield result

You can also override the 'primary' class method in a Lane class:

from l2l import Lane

class MyPrimaryLane(Lane):
    @classmethod
    def primary(cls) -> bool:
        return True  # This makes it a primary lane

    def process(self, value):
        # Process the input value
        result = transform_data(value)
        yield result

Defining Lane Order

Lanes can specify other lanes to run before and after them:

class MainLane(PrimaryLane):
    # Define lanes to run before and after this lane
    lanes = {
        -10: "PreprocessLane",   # Run PreprocessLane before this lane (higher negative priority runs first)
        -5: ValidationLane,      # Run ValidationLane after PreprocessLane but before this lane
        0: PostProcessLane,      # Run PostProcessLane after this lane
        10: CleanupLane,         # Run CleanupLane after PostProcessLane
        20: None,                # Use None to remove a lane at this priority
    }

    def process(self, value):
        # Process after PreprocessLane and ValidationLane
        # but before PostProcessLane and CleanupLane
        return transform_data(value)

Running Lanes

# Start a specific primary lane
result = Lane.start("MAIN_LANE")

# Start all primary lanes that match a name
results = Lane.start_all("MAIN")

Subscriber Example

Subscriber is a pre-defined lane class that provides a standard way to generate data. Rather than processing input from previous lanes, Subscriber lanes generate their own payloads:

from l2l import Subscriber

class DataSourceLane(Subscriber):
    def get_payloads(self, value):
        # Fetch data from some source
        data = fetch_data_from_source()
        for item in data:
            yield item

Instead of implementing process(), you only need to implement get_payloads() to define where your data comes from. The Subscriber class handles the rest automatically.

Advanced Features

Conditional Execution

Lanes can have conditions for execution:

class ConditionalLane(Lane):
    @classmethod
    def condition(cls, name: str):
        # Only run this lane if the name contains "SPECIAL"
        return "SPECIAL" in name

Custom Naming

Provide custom names or aliases for lanes:

class CustomNamedLane(Lane):
    @classmethod
    def name(cls) -> Iterable[str]:
        yield "CUSTOM_PROCESS"
        yield "PROCESSOR"  # An alias

Maximum Run Count

Limit how many times a lane can run:

class OneTimeLane(Lane):
    @classmethod
    def max_run_count(cls) -> int:
        return 1  # Run this lane only once

Multiprocessing Support

Lane2Lane supports multiprocessing for parallel data processing:

class ParallelProcessingLane(Lane):
    multiprocessing = True  # Enable multiprocessing for this lane

    def process(self, value):
        # Process data in parallel
        # Each yielded item will be processed by subsequent lanes
        yield processed_item

Error Handling

Lanes provide built-in error handling capabilities:

class ErrorHandlingLane(Lane):
    @classmethod
    def terminate_on_error(cls):
        return True  # Stop processing on error (default behavior)

    def process(self, value):
        try:
            # Process data
            yield processed_data
        except Exception as e:
            # Access errors with self.errors
            # Global errors available via Lane.global_errors()
            pass

Complete Example

Here's a complete example showing a data processing pipeline:

from l2l import Lane, PrimaryLane, Subscriber

# Data source that fetches records
class DataSourceLane(Subscriber):
    def get_payloads(self, value):
        data = [
            {"id": 1, "name": "Alice", "score": 85},
            {"id": 2, "name": "Bob", "score": 92},
            {"id": 3, "name": "Charlie", "score": 78},
        ]
        for item in data:
            yield item

# Validation lane
class ValidationLane(Lane):
    def process(self, value):
        if "id" not in value or "name" not in value:
            raise ValueError(f"Invalid data format: {value}")
        yield value

# Processing lane
class ScoreProcessingLane(Lane):
    def process(self, value):
        # Add grade based on score
        if "score" in value:
            if value["score"] >= 90:
                value["grade"] = "A"
            elif value["score"] >= 80:
                value["grade"] = "B"
            elif value["score"] >= 70:
                value["grade"] = "C"
            else:
                value["grade"] = "D"
        yield value

# Output formatting lane
class FormattingLane(Lane):
    def process(self, value):
        yield f"Student {value['name']} (ID: {value['id']}) - Score: {value['score']}, Grade: {value.get('grade', 'N/A')}"

# Main primary lane that orchestrates the pipeline
class StudentProcessingLane(PrimaryLane):
    lanes = {
        -30: DataSourceLane,       # First fetch the data
        -20: ValidationLane,       # Then validate it
        -10: ScoreProcessingLane,  # Then process scores
        0: FormattingLane,         # Finally format for output
    }

    # Note: No need to implement process() if you're just passing values through
    # The Lane class already handles this behavior by default

# Run the pipeline
results = Lane.start("STUDENT_PROCESSING")
for result in results:
    print(result)

Output:

Student Alice (ID: 1) - Score: 85, Grade: B
Student Bob (ID: 2) - Score: 92, Grade: A
Student Charlie (ID: 3) - Score: 78, Grade: C

License

MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

lane2lane-1.5.7.tar.gz (16.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

lane2lane-1.5.7-py3-none-any.whl (14.9 kB view details)

Uploaded Python 3

File details

Details for the file lane2lane-1.5.7.tar.gz.

File metadata

  • Download URL: lane2lane-1.5.7.tar.gz
  • Upload date:
  • Size: 16.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.10.16

File hashes

Hashes for lane2lane-1.5.7.tar.gz
Algorithm Hash digest
SHA256 69f3e3b454f8bf56b74de47fc521635e09b3755261a9f6eed74b60bf948dd50f
MD5 93e3385771403979180f0b4357696357
BLAKE2b-256 6c4817d1af75ccd3a9aa61dc58a95b304687f4eb7dbb6eac06d74bdefeada960

See more details on using hashes here.

File details

Details for the file lane2lane-1.5.7-py3-none-any.whl.

File metadata

  • Download URL: lane2lane-1.5.7-py3-none-any.whl
  • Upload date:
  • Size: 14.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.10.16

File hashes

Hashes for lane2lane-1.5.7-py3-none-any.whl
Algorithm Hash digest
SHA256 c61dd91ee8f91cc57d1c0a1f4536fc112d46ca1b4668afa33a743583d60bab27
MD5 f7ff2d3828d037b90ec802886e2f9f41
BLAKE2b-256 f89eeee263e87619dbf0948285a7883b06dd3a9091deff887ea0b8ad70102ba9

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page