Skip to main content

Signal processing execution graph using DAG and data classes

Project description

sigexec

Signal processing execution graphs with efficient port-based data flow.

GitHub Pages

Features

  • Efficient Execution: Static analysis determines optimal port routing - operations receive only the data they need
  • Flow Visualization: Automatic mermaid diagrams showing data flow including bypass connections
  • Dataclass Blocks: Type-safe, reusable processing blocks using Python dataclasses
  • Parameter Variants: Explore parameter spaces with automatic execution graph generation
  • Port Optimization: Automatic detection of which metadata ports each operation uses

Installation

pip install sigexec

Or from source:

git clone https://github.com/briday1/sigexec.git
cd sigexec
pip install -e .

Quick Example

from sigexec import Graph, GraphData
import numpy as np

# Create a simple processing graph
graph = Graph("Processing")

graph.add(lambda g: GraphData(g.data * 2, metadata=g.metadata), name="Scale")
graph.add(lambda g: GraphData(g.data + 10, metadata=g.metadata), name="Offset")

result = graph.run(GraphData(np.array([1.0, 2.0, 3.0])))
print(result.data)  # [12. 14. 16.]

# Visualize the flow
print(graph.to_mermaid())

Output:

flowchart TD
    node0[Scale]
    node1[Offset]
    node0 --|data|--> node1

Dataclass Blocks

Create reusable processing blocks as dataclasses:

from dataclasses import dataclass
from sigexec import Graph, GraphData
import numpy as np

@dataclass
class Scaler:
    factor: float = 2.0
    
    def __call__(self, gdata: GraphData) -> GraphData:
        return GraphData(
            gdata.data * self.factor,
            metadata={**gdata.metadata, 'scaled_by': self.factor}
        )

@dataclass
class Filter:
    cutoff: float = 0.5
    
    def __call__(self, gdata: GraphData) -> GraphData:
        # Apply your filtering logic
        filtered = gdata.data * (1.0 - self.cutoff)
        return GraphData(filtered, metadata=gdata.metadata)

# Use them in a graph
graph = Graph("MyPipeline")
graph.add(Scaler(factor=3.0), name="Scale")
graph.add(Filter(cutoff=0.2), name="Filter")

result = graph.run(GraphData(np.array([1.0, 2.0, 3.0])))

Port Bypass Optimization

Operations automatically bypass intermediate steps when they don't need specific ports:

from sigexec import Graph, GraphData
import numpy as np

def generator(gdata):
    """Produces multiple outputs"""
    return GraphData(
        gdata.data * 2,
        metadata={
            'config': {'mode': 'fast'},
            'timestamp': 12345
        }
    )

def processor(gdata):
    """Only uses 'data' - doesn't touch config or timestamp"""
    return GraphData(gdata.data + 10, metadata=gdata.metadata)

def finalizer(gdata):
    """Uses data AND the original config (which bypasses processor)"""
    config = gdata.metadata.get('config', {})
    mode = config.get('mode', 'unknown')
    result = gdata.data * 3 if mode == 'fast' else gdata.data
    return GraphData(result, metadata=gdata.metadata)

graph = Graph("PortBypass")
graph.add(generator, name="Gen")
graph.add(processor, name="Process")
graph.add(finalizer, name="Final")

print(graph.to_mermaid())

Output shows bypass connection (dotted line):

flowchart TD
    node0[Gen]
    node1[Process]
    node2[Final]
    node0 --|data|--> node1
    node1 --|data|--> node2
    node0 -.config.-> node2

The config port flows directly from Gen→Final, bypassing Process entirely!

Parameter Variants

Explore parameter spaces with automatic visualization:

from sigexec import Graph, GraphData
import numpy as np

def make_scaler(factor):
    return lambda g: GraphData(g.data * factor, metadata=g.metadata)

def make_offsetter(amount):
    return lambda g: GraphData(g.data + amount, metadata=g.metadata)

graph = Graph("Variants")
graph.add(lambda g: GraphData(np.array([1.0, 2.0, 3.0]), metadata=g.metadata), name="Generate")
graph.variant(make_scaler, [2.0, 3.0, 5.0], name='Scale')
graph.variant(make_offsetter, [10.0, 20.0], name='Offset')
graph.add(lambda g: GraphData(g.data, metadata={
    **g.metadata, 
    'mean': float(np.mean(g.data))
}), name="Stats")

print(graph.to_mermaid())

Hexagons indicate operations with variants:

flowchart TD
    node0[Generate]
    node1{{Scale}}
    node2{{Offset}}
    node3[Stats]
    node0 --|data|--> node1
    node1 --|data|--> node2
    node2 --|data|--> node3

The graph executes all combinations: 3 scale factors × 2 offsets = 6 total executions.

Radar Processing Example

The included radar processing blocks demonstrate a complete application:

from sigexec import Graph
from sigexec.blocks import LFMGenerator, StackPulses, RangeCompress, DopplerCompress

graph = (Graph("Radar Processing")
    .add(LFMGenerator(num_pulses=128, target_delay=20e-6, target_doppler=1000.0), name="Gen")
    .add(StackPulses(), name="Stack")
    .add(RangeCompress(window='hamming', oversample_factor=2), name="RangeCompress")
    .add(DopplerCompress(), name="Doppler"))

result = graph.run()
range_doppler_map = result.data

print(graph.to_mermaid())

Execution graph showing efficient port routing:

flowchart TD
    node0[Gen]
    node1[Stack]
    node2[RangeCompress]
    node3[Doppler]
    node0 --|data|--> node1
    node1 --|data|--> node2
    node0 -.reference_pulse.-> node2
    node2 --|data|--> node3
    node0 -.pulse_repetition_interval.-> node3

Notice how reference_pulse and pulse_repetition_interval bypass operations that don't need them!

Creating Custom Blocks

Blocks are just dataclasses with __call__ methods:

from dataclasses import dataclass
from sigexec import GraphData
import numpy as np

@dataclass
class MyProcessor:
    """Your custom processing block"""
    param1: float = 1.0
    param2: str = "default"
    
    def __call__(self, gdata: GraphData) -> GraphData:
        # Read input data
        input_data = gdata.data
        
        # Access metadata if needed
        config = gdata.metadata.get('config', {})
        
        # Process
        output_data = input_data * self.param1
        
        # Return new GraphData with results
        return GraphData(
            output_data,
            metadata={
                **gdata.metadata,
                'processed_with': self.param2
            }
        )

That's it! Use it in any graph:

graph = Graph("Custom")
graph.add(MyProcessor(param1=2.5, param2="advanced"), name="MyBlock")

Documentation

Key Concepts

GraphData

The standard data container that flows through operations. Has:

  • data: numpy array (the main signal/data)
  • metadata: dict of additional named ports (configs, timestamps, reference signals, etc.)

Ports

Named data fields in metadata. Operations declare which ports they use via:

  • Static analysis (AST inspection of function code)
  • Runtime detection (observing metadata access patterns)

This enables automatic optimization - ports flow directly from producer to consumer, bypassing intermediate operations.

Visualization

  • Solid lines (-->) show direct flow between adjacent operations
  • Dotted lines (-.->) show bypass connections where ports skip operations
  • Rectangles [Name] show regular operations
  • Hexagons {{Name}} show operations with parameter variants

Requirements

  • Python >= 3.7
  • numpy >= 1.20.0
  • scipy >= 1.7.0
  • matplotlib >= 3.3.0
  • plotly >= 5.0.0

License

MIT License - see LICENSE file for details.

Contributing

Contributions welcome! Please feel free to submit a Pull Request.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sigexec-2026.49.tar.gz (33.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

sigexec-2026.49-py3-none-any.whl (31.5 kB view details)

Uploaded Python 3

File details

Details for the file sigexec-2026.49.tar.gz.

File metadata

  • Download URL: sigexec-2026.49.tar.gz
  • Upload date:
  • Size: 33.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for sigexec-2026.49.tar.gz
Algorithm Hash digest
SHA256 481d8b4fbf94304caf6bab3fa6bb28a5935cc170075f2aa03e203b19b22b4d43
MD5 a2696ab4e5485c5af1898652c8357733
BLAKE2b-256 043441098b96c511b9a66e1391ea5920f7663688bc7a6f55974813928dd44364

See more details on using hashes here.

File details

Details for the file sigexec-2026.49-py3-none-any.whl.

File metadata

  • Download URL: sigexec-2026.49-py3-none-any.whl
  • Upload date:
  • Size: 31.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for sigexec-2026.49-py3-none-any.whl
Algorithm Hash digest
SHA256 71b1420d2ec79c5a6f46e8716beccf46c5137ecd658b2422dfd5c4cfa8b61c1e
MD5 bb5c9d711e6b850080cc8a633c7202df
BLAKE2b-256 4ad94144d05a67eed00154be4255d14fb5f7368be601026b28851d33ff5e0069

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page