Skip to main content

Signal processing execution graph using DAG and data classes

Project description

SigExec - Signal Processing Chain Framework

A Python framework for building signal processing graphs with port-based data flow and parameter exploration.

SigExec provides the framework - you bring the blocks! The included radar processing blocks are examples showing how to use the framework. You can easily create your own custom blocks for any signal processing application.

Quick Links

Features

  • Port-Based Data Flow: Natural data flow through named ports - operations read/write exactly what they need
  • Data Class Blocks: Type-safe, composable processing blocks using Python dataclasses
  • Parameter Exploration: Built-in support for exploring parameter combinations with .variant()
  • Graph Visualization: Visualize operation sequences and variant combinations
  • Extensible: Create custom blocks as simple dataclasses - no complex interfaces required
  • Functional Composition: Chain operations naturally with consistent input/output types
  • Example Application: Complete radar processing demonstrating:
    • LFM signal generation with delay and Doppler shift
    • Pulse stacking
    • Matched filtering (range compression)
    • FFT processing (Doppler compression)
    • Range-Doppler map visualization

Installation

From Source

git clone https://github.com/briday1/sigexec.git
cd sigexec
pip install -e .

Requirements

  • Python >= 3.7
  • numpy >= 1.20.0
  • scipy >= 1.7.0
  • matplotlib >= 3.3.0

Quick Start

Simplest Example - Direct Chaining

The cleanest approach where each block is a configured data class:

from sigexec.blocks import LFMGenerator, StackPulses, RangeCompress, DopplerCompress
### Simplest Example - Direct Chaining

The cleanest approach where each block is a configured data class:

```python
from sigexec.blocks import LFMGenerator, StackPulses, RangeCompress, DopplerCompress

# Configure blocks
gen = LFMGenerator(num_pulses=128, target_delay=20e-6, target_doppler=1000.0)
stack = StackPulses()
range_comp = RangeCompress()
doppler_comp = DopplerCompress(window='hann')

# GraphData object flows through operations
gdata = gen()                    # Generate signal
gdata = stack(gdata)             # Stack pulses
gdata = range_comp(gdata)        # Range compression
gdata = doppler_comp(gdata)      # Doppler compression

# Result is a range-doppler map!
range_doppler_map = gdata.data

Using Graph for Better Organization

from sigexec import Graph
from sigexec.blocks import LFMGenerator, StackPulses, RangeCompress, DopplerCompress

# Build graph with fluent interface
result = (Graph("Radar")
    .add(LFMGenerator(num_pulses=128, target_delay=20e-6, target_doppler=1000.0))
    .add(StackPulses())
    .add(RangeCompress())
    .add(DopplerCompress(window='hann'))
    .run(verbose=True)
)

# Access the range-doppler map
rdm = result.data

Parameter Exploration with Variants

# Explore different window functions
graph = (Graph("Radar")
    .add(LFMGenerator(num_pulses=128))
    .add(StackPulses())
    .add(RangeCompress())
    .variant(lambda w: DopplerCompress(window=w),
             configs=['hann', 'hamming', 'blackman'],
             names=['Hann', 'Hamming', 'Blackman'])
)

# Run all variants
results = graph.run()

# Results is a list of (params, result) tuples
for params, result in results:
    print(f"Window: {params['variant'][0]}")

Branching and Merging

Use branches when you want to run multiple parallel processing paths that may use identical port names. After processing, use .merge() with a custom merge function to combine branch outputs into a single GraphData.

The merge function receives a BranchesView (ordered) which supports both name-based (branches['name']) and index-based (branches[0]) access so blocks don't need to know the branch names.

# Example: compare two branches and merge their outputs
from sigexec import Graph, GraphData
from sigexec.blocks import LFMGenerator, StackPulses, RangeCompress, DopplerCompress

def compare_merge(branches):
    # index-based access is convenient and ordered
    a = branches[0].data
    b = branches[1].data

    out = GraphData()
    out.data = np.concatenate([a, b])
    out.set('compared', True)
    return out

graph = (Graph("CompareWindows")
    .add(LFMGenerator())
    .add(StackPulses())
    .add(RangeCompress())
    .branch(['hann', 'hamming'])
    .add(DopplerCompress(window='hann'), branch='hann')
    .add(DopplerCompress(window='hamming'), branch='hamming')
    .merge(compare_merge, branches=['hann', 'hamming'])
)

result = graph.run(GraphData())
print(result.get('compared'))
# Explore different window functions
graph = (Graph("Radar")
    .add(LFMGenerator(num_pulses=128))
    .add(StackPulses())
    .add(RangeCompress())
    .variant(lambda w: DopplerCompress(window=w),
             configs=['hann', 'hamming', 'blackman'],
             names=['Hann', 'Hamming', 'Blackman'])
)

# Visualize the graph structure
print(graph.visualize())
# Shows:
#   Graph: Radar
#   1. Op0
#   2. Op1
#   3. Op2
#   4. VARIANT: variants
#      ├─ Hann
#      ├─ Hamming
#      ├─ Blackman
#   Total operations: 4
#   Variant combinations: 3
#   Note: Each variant executes with its own isolated GraphData

# Run all variants
results = graph.run()

# Results is a list of (params, result) tuples
for params, result in results:
    print(f"Window: {params['variant'][0]}")
    # Each result has its own isolated ports

Visualizing Graphs

SigExec can generate Mermaid diagrams to visualize your processing flow without running it:

from sigexec import Graph
from sigexec.blocks import LFMGenerator, StackPulses, RangeCompress, DopplerCompress

# Build a graph
graph = (Graph("Radar Processing")
    .add(LFMGenerator(num_pulses=128), name="Generate_LFM")
    .add(StackPulses(), name="Stack_Pulses")
    .add(RangeCompress(), name="Range_Compress")
    .add(DopplerCompress(window='hann'), name="Doppler_Compress"))

# Get Mermaid diagram
print(graph.to_mermaid())

# Or save to file (renders in VS Code, GitHub, etc.)
graph.visualize("radar_flow.md")

This generates a flowchart showing the complete processing pipeline with all operations, branches, and merges.

Inspecting Graphs

New Feature: You can inspect the graph structure before running it using the same logic that executes the graph:

# Create sample input for port analysis
sample = GraphData()
sample.data = np.array([1, 2, 3])
sample.sample_rate = 1e6

# Inspect as structured data
info = graph.inspect(sample, format='dict')
print(f"Graph has {len(info['nodes'])} nodes and {len(info['edges'])} edges")

for edge in info['edges']:
    print(f"{edge['from']} -> {edge['to']} (ports: {edge['ports']})")

# Inspect as Mermaid diagram
mermaid = graph.inspect(sample, format='mermaid')
print(mermaid)  # Shows flowchart with port information

Why inspect?

  • Same logic as execution: The inspection uses the exact same graph walking logic as run(), ensuring what you see is what will execute
  • See before you run: Understand data flow, branches, and port usage without actually executing operations
  • Debug graph structure: Verify branches are created correctly and merge points are where you expect

The key difference from visualize(): inspect() uses the exact same unified graph walker as execution, just in inspection mode instead of execution mode. This guarantees the visualization matches actual runtime behavior.

Visualizing Branches

graph = (Graph("Branched Processing")
    .add(generate_data, name="Generate")
    .branch(["filter_a", "filter_b"])
    .add(filter_a, branch="filter_a", name="Filter_A")
    .add(filter_b, branch="filter_b", name="Filter_B")
    .merge(merge_fn, branches=["filter_a", "filter_b"], name="Merge"))

# Shows branches as separate paths with dotted lines
graph.visualize("branched_flow.md")

Visualizing Variants

graph = (Graph("Variant Exploration")
    .add(load_data, name="Load")
    .variant(lambda w: apply_window(w),
             ['hamming', 'hann', 'blackman'],
             names=['Hamming', 'Hann', 'Blackman']))

# Shows variant node with configuration options
graph.visualize("variants_flow.md")

Port Optimization (Default Behavior)

By default (optimize_ports=True), operations only receive the ports they actually use. Unused ports "bypass" operations entirely, improving memory efficiency and making data flow explicit.

Visual Comparison: Port Flow

Here's a graph that demonstrates the difference:

from sigexec import Graph, GraphData, requires_ports

def source(g):
    g.a = 10  # Creates port 'a'
    g.b = 20  # Creates port 'b'
    return g

@requires_ports('a')
def use_a(g):
    g.result_a = g.a * 2
    return g

@requires_ports('b')
def use_b(g):
    g.result_b = g.b * 3
    return g

@requires_ports('result_a', 'result_b')
def combine(g):
    g.final = g.result_a + g.result_b
    return g

graph = (Graph("Port Demo")
    .add(source, name='source')
    .add(use_a, name='use_a')
    .add(use_b, name='use_b')
    .add(combine, name='combine'))

WITHOUT Port Optimization (optimize_ports=False):

All ports flow through every operation (wasteful):

```mermaid
flowchart LR
    source([source]) --|a, b|--> use_a
    use_a[use_a] --|a, b, result_a|--> use_b
    use_b[use_b] --|a, b, result_a, result_b|--> combine
    combine[combine] --> final([final])
```
flowchart LR
    source([source]) --|a, b|--> use_a
    use_a[use_a] --|a, b, result_a|--> use_b
    use_b[use_b] --|a, b, result_a, result_b|--> combine
    combine[combine] --> final([final])
  • use_a receives [a, b] but only uses a
  • use_b receives [a, b, result_a] but only uses b
  • Ports b, a, and result_a are unnecessarily copied

WITH Port Optimization (optimize_ports=True - Default):

Only needed ports flow (efficient):

```mermaid
flowchart LR
    source([source]) --|a|--> use_a
    source -.b.-> use_b
    use_a[use_a] --|result_a|--> combine
    use_a -.b.-> use_b
    use_b[use_b] --|b, result_b|--> combine
    use_b -.result_a.-> combine
    combine[combine] --> final([final])
```
flowchart LR
    source([source]) --|a|--> use_a
    source -.b.-> use_b
    use_a[use_a] --|result_a|--> combine
    use_a -.b.-> use_b
    use_b[use_b] --|b, result_b|--> combine
    use_b -.result_a.-> combine
    combine[combine] --> final([final])
  • use_a receives ONLY [a] ✓ (solid line)
  • Port b bypasses use_a (dotted line) ✓
  • use_b receives ONLY [b] ✓ (solid line)
  • Ports a and result_a bypass use_b (dotted lines) ✓
  • combine receives all needed ports ✓

Key:

  • Solid arrows (-->): Ports that flow to and are used by the operation
  • Dotted arrows (-.->): Ports that bypass the operation (not needed)

Run with verbose=True to see the actual port flow at runtime:

# See which ports each operation actually receives
result = graph.run(GraphData(), verbose=True)

Benefits of port optimization:

  • ✓ Memory efficient: Only copy ports that are used
  • ✓ Explicit data flow: Clear which ports each operation needs
  • ✓ Implicit branching: Operations using different ports naturally create parallel paths
  • ✓ Backwards compatible: Set optimize_ports=False for old behavior

See examples/port_optimization_visual_demo.py and examples/mermaid_port_comparison.py for detailed comparisons.

Running Examples

# Publish all demos to docs/ (for GitHub Pages)
python examples/publish_demos.py

# Or run individual demos (publishes to staticdash/)
python examples/radar_processing_demo.py
python examples/custom_blocks_demo.py
python examples/parameter_exploration_demo.py
python examples/post_processing_demo.py
python examples/input_variants_demo.py

Architecture

Core Components

GraphData

A data class that wraps signal arrays with metadata:

@dataclass
class GraphData:
    data: np.ndarray          # Signal data
    sample_rate: float        # Sampling rate
    metadata: Dict[str, Any]  # Additional information

Key Point: Every processing block takes GraphData as input and returns GraphData as output, enabling clean composition.

Data Class Blocks (Recommended)

Modern, clean blocks implemented as dataclasses:

from sigexec.blocks import LFMGenerator, StackPulses, RangeCompress, DopplerCompress

# Configure blocks with parameters
gen = LFMGenerator(num_pulses=128, target_delay=20e-6)
stack = StackPulses()
compress = RangeCompress()

# Call them directly - each returns GraphData
signal = gen()
signal = stack(signal)
signal = compress(signal)

Available data class blocks:

  • LFMGenerator - Generate LFM radar signals
  • StackPulses - Organize pulses into 2D matrix
  • RangeCompress - Matched filtering for range compression
  • DopplerCompress - FFT-based Doppler processing
  • ToMagnitudeDB - Convert to dB scale
  • Normalize - Normalize signal data

Graph

Manages execution with fluent interface:

graph = (Graph("MyPipeline")
    .add(block1)
    .add(block2)
    .add(block3)
    .run()
)

Processing Blocks

All blocks follow the pattern: GraphData → Block → GraphData

LFMGenerator

Generates LFM radar signals with configurable parameters:

  • Pulse duration and bandwidth
  • Target delay and Doppler shift
  • Noise characteristics
gen = LFMGenerator(
    num_pulses=128,
    pulse_duration=10e-6,
    bandwidth=5e6,
    target_delay=20e-6,
    target_doppler=1000.0
)
signal = gen()  # Returns GraphData

StackPulses

Organizes pulses into a 2D matrix for coherent processing.

RangeCompress

Performs range compression using matched filtering:

  • Correlates received signal with transmitted waveform
  • Improves SNR and range resolution

DopplerCompress

Performs Doppler compression using FFT:

  • FFT along pulse dimension
  • Windowing for sidelobe reduction
  • Generates Range-Doppler map

Example Output

The radar examples produce Range-Doppler maps showing:

  • 2D visualization: Range vs Doppler frequency with intensity showing target returns
  • Target detection: Clear peak at expected range (~3 km) and Doppler (~1 kHz)
  • Noise floor: Background noise visible across the map

Project Structure

sigexec/
├── sigexec/
│   ├── __init__.py
│   ├── core/
│   │   ├── __init__.py
│   │   ├── data.py          # GraphData class
│   │   └── graph.py      # Graph with fluent interface
│   └── blocks/
│       ├── __init__.py
│       └── functional.py    # Functional processing blocks
├── examples/
│   ├── radar_processing_demo.py
│   ├── custom_blocks_demo.py
│   ├── parameter_exploration_demo.py
│   ├── post_processing_demo.py
│   ├── input_variants_demo.py
│   ├── memoization_demo.py
│   └── publish_demos.py
├── tests/
│   └── test_sigexec.py
├── docs/
│   └── [Generated demo pages]
├── pyproject.toml
└── README.md

Usage Patterns

Pattern 1: Direct Chaining (Cleanest)

# Configure data class blocks
gen = LFMGenerator(num_pulses=128, target_delay=20e-6)
stack = StackPulses()
compress_range = RangeCompress()
compress_doppler = DopplerCompress()

# Single object flows through
signal = gen()
signal = stack(signal)
signal = compress_range(signal)
signal = compress_doppler(signal)

Pattern 2: Graph Builder

result = (Graph("Radar")
    .add(LFMGenerator(num_pulses=128))
    .add(StackPulses())
    .add(RangeCompress())
    .add(DopplerCompress())
    .tap(lambda sig: print(f"Shape: {sig.shape}"))  # Inspect
    .run(verbose=True)
)

Pattern 3: Functional Composition

# Compose operations functionally
process = lambda sig: DopplerCompress()(RangeCompress()(StackPulses()(sig)))
result = process(LFMGenerator()())

Creating Custom Blocks

SigExec is designed to be extended! The included radar blocks are examples - create your own blocks for any domain:

from dataclasses import dataclass
from sigexec import GraphData

@dataclass
class MyCustomBlock:
    """My custom processing block."""
    
    param1: float = 1.0
    param2: str = 'default'
    
    def __call__(self, signal_data: GraphData) -> GraphData:
        """Process the signal."""
        processed_data = your_algorithm(signal_data.data, self.param1)
        
        metadata = signal_data.metadata.copy()
        metadata['my_processing'] = True
        
        return GraphData(
            data=processed_data,
            sample_rate=signal_data.sample_rate,
            metadata=metadata
        )

# Use it with built-in or other custom blocks
my_block = MyCustomBlock(param1=2.5)
result = my_block(input_signal)

Distributing Custom Blocks

You can create and distribute your own block packages:

# Your package: my_signal_blocks
from sigexec import Graph
from my_signal_blocks import CustomFilter, CustomTransform

result = (Graph("MyPipeline")
    .add(CustomFilter(cutoff=1000))
    .add(CustomTransform(mode='advanced'))
    .run()
)

Learn more:

Documentation

  • CUSTOM_BLOCKS.md - Guide to creating and distributing custom blocks
  • examples/ - Working examples with different patterns
  • tests/ - Unit tests for all components

Design Philosophy

  1. Framework First: SigExec provides the framework; you provide the blocks
  2. Type Safety: Same type (GraphData) throughout the graph
  3. Composability: Blocks can be combined in any order
  4. Extensibility: Easy to create and distribute custom blocks
  5. Clarity: Configuration separate from execution
  6. Immutability: Each block returns new data
  7. Simplicity: Minimal API surface, maximum flexibility

Extensibility

The radar processing blocks included in sigexec.blocks are examples demonstrating the framework. The framework is designed to support:

  • Any signal processing domain: Audio, video, communications, radar, medical imaging, etc.
  • Custom block packages: Distribute your blocks as separate Python packages
  • Third-party blocks: Use blocks from other packages with full framework integration
  • Domain-specific graphs: Build specialized processing chains for your application

See CUSTOM_BLOCKS.md for a complete guide on creating and distributing custom blocks.

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

This project is open source and available under the MIT License.

Acknowledgments

This framework demonstrates fundamental radar signal processing concepts and serves as a foundation for building more complex signal processing graphs.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sigexec-2026.32.tar.gz (44.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

sigexec-2026.32-py3-none-any.whl (32.9 kB view details)

Uploaded Python 3

File details

Details for the file sigexec-2026.32.tar.gz.

File metadata

  • Download URL: sigexec-2026.32.tar.gz
  • Upload date:
  • Size: 44.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for sigexec-2026.32.tar.gz
Algorithm Hash digest
SHA256 8b531af7c9875a78262bd8a341bac509e9c1dcca95c7f04db24d1f713dc6ddb4
MD5 1b1af2177d80ebef7730f06191d1cddb
BLAKE2b-256 01d395151f016640b4173ac34a875679fc93ef2d876bf612fc70f8f2cd9a20f4

See more details on using hashes here.

File details

Details for the file sigexec-2026.32-py3-none-any.whl.

File metadata

  • Download URL: sigexec-2026.32-py3-none-any.whl
  • Upload date:
  • Size: 32.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for sigexec-2026.32-py3-none-any.whl
Algorithm Hash digest
SHA256 e5cea671c427cb2cf3dcb3fa40d99e796b7cf66ddf8c1e20fe260b0a664eac36
MD5 c63ecb38d4b5368487f26729905df5d9
BLAKE2b-256 0eea5068a7b7c0dbd5afc006342c915cb0fda924320920d334ea318db997358c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page