Skip to main content

Signal processing execution graph using DAG and data classes

Project description

sigexec

Signal processing execution graphs with efficient port-based data flow.

GitHub Pages

Features

  • Efficient Execution: Static analysis determines optimal port routing - operations receive only the data they need
  • Flow Visualization: Automatic mermaid diagrams showing data flow including bypass connections
  • Dataclass Blocks: Type-safe, reusable processing blocks using Python dataclasses
  • Parameter Variants: Explore parameter spaces with automatic execution graph generation
  • Port Optimization: Automatic detection of which metadata ports each operation uses

Installation

pip install sigexec

Or from source:

git clone https://github.com/briday1/sigexec.git
cd sigexec
pip install -e .

Quick Example

from sigexec import Graph, GraphData
import numpy as np

# Create a simple processing graph
graph = Graph("Processing")

graph.add(lambda g: GraphData(g.data * 2, metadata=g.metadata), name="Scale")
graph.add(lambda g: GraphData(g.data + 10, metadata=g.metadata), name="Offset")

result = graph.run(GraphData(np.array([1.0, 2.0, 3.0])))
print(result.data)  # [12. 14. 16.]

# Visualize the flow
print(graph.to_mermaid())

Output:

flowchart TD
    node0[Scale]
    node1[Offset]
    node0 --|data|--> node1

Dataclass Blocks

Create reusable processing blocks as dataclasses:

from dataclasses import dataclass
from sigexec import Graph, GraphData
import numpy as np

@dataclass
class Scaler:
    factor: float = 2.0
    
    def __call__(self, gdata: GraphData) -> GraphData:
        return GraphData(
            gdata.data * self.factor,
            metadata={**gdata.metadata, 'scaled_by': self.factor}
        )

@dataclass
class Filter:
    cutoff: float = 0.5
    
    def __call__(self, gdata: GraphData) -> GraphData:
        # Apply your filtering logic
        filtered = gdata.data * (1.0 - self.cutoff)
        return GraphData(filtered, metadata=gdata.metadata)

# Use them in a graph
graph = Graph("MyPipeline")
graph.add(Scaler(factor=3.0), name="Scale")
graph.add(Filter(cutoff=0.2), name="Filter")

result = graph.run(GraphData(np.array([1.0, 2.0, 3.0])))

Port Bypass Optimization

Operations automatically bypass intermediate steps when they don't need specific ports:

from sigexec import Graph, GraphData
import numpy as np

def generator(gdata):
    """Produces multiple outputs"""
    return GraphData(
        gdata.data * 2,
        metadata={
            'config': {'mode': 'fast'},
            'timestamp': 12345
        }
    )

def processor(gdata):
    """Only uses 'data' - doesn't touch config or timestamp"""
    return GraphData(gdata.data + 10, metadata=gdata.metadata)

def finalizer(gdata):
    """Uses data AND the original config (which bypasses processor)"""
    config = gdata.metadata.get('config', {})
    mode = config.get('mode', 'unknown')
    result = gdata.data * 3 if mode == 'fast' else gdata.data
    return GraphData(result, metadata=gdata.metadata)

graph = Graph("PortBypass")
graph.add(generator, name="Gen")
graph.add(processor, name="Process")
graph.add(finalizer, name="Final")

print(graph.to_mermaid())

Output shows bypass connection (dotted line):

flowchart TD
    node0[Gen]
    node1[Process]
    node2[Final]
    node0 --|data|--> node1
    node1 --|data|--> node2
    node0 -.config.-> node2

The config port flows directly from Gen→Final, bypassing Process entirely!

Parameter Variants

Explore parameter spaces with automatic visualization:

from sigexec import Graph, GraphData
import numpy as np

def make_scaler(factor):
    return lambda g: GraphData(g.data * factor, metadata=g.metadata)

def make_offsetter(amount):
    return lambda g: GraphData(g.data + amount, metadata=g.metadata)

graph = Graph("Variants")
graph.add(lambda g: GraphData(np.array([1.0, 2.0, 3.0]), metadata=g.metadata), name="Generate")
graph.variant(make_scaler, [2.0, 3.0, 5.0], name='Scale')
graph.variant(make_offsetter, [10.0, 20.0], name='Offset')
graph.add(lambda g: GraphData(g.data, metadata={
    **g.metadata, 
    'mean': float(np.mean(g.data))
}), name="Stats")

print(graph.to_mermaid())

Hexagons indicate operations with variants:

flowchart TD
    node0[Generate]
    node1{{Scale}}
    node2{{Offset}}
    node3[Stats]
    node0 --|data|--> node1
    node1 --|data|--> node2
    node2 --|data|--> node3

The graph executes all combinations: 3 scale factors × 2 offsets = 6 total executions.

Radar Processing Example

The included radar processing blocks demonstrate a complete application:

from sigexec import Graph
from sigexec.blocks import LFMGenerator, StackPulses, RangeCompress, DopplerCompress

graph = (Graph("Radar Processing")
    .add(LFMGenerator(num_pulses=128, target_delay=20e-6, target_doppler=1000.0), name="Gen")
    .add(StackPulses(), name="Stack")
    .add(RangeCompress(window='hamming', oversample_factor=2), name="RangeCompress")
    .add(DopplerCompress(), name="Doppler"))

result = graph.run()
range_doppler_map = result.data

print(graph.to_mermaid())

Execution graph showing efficient port routing:

flowchart TD
    node0[Gen]
    node1[Stack]
    node2[RangeCompress]
    node3[Doppler]
    node0 --|data|--> node1
    node1 --|data|--> node2
    node0 -.reference_pulse.-> node2
    node2 --|data|--> node3
    node0 -.pulse_repetition_interval.-> node3

Notice how reference_pulse and pulse_repetition_interval bypass operations that don't need them!

Creating Custom Blocks

Blocks are just dataclasses with __call__ methods:

from dataclasses import dataclass
from sigexec import GraphData
import numpy as np

@dataclass
class MyProcessor:
    """Your custom processing block"""
    param1: float = 1.0
    param2: str = "default"
    
    def __call__(self, gdata: GraphData) -> GraphData:
        # Read input data
        input_data = gdata.data
        
        # Access metadata if needed
        config = gdata.metadata.get('config', {})
        
        # Process
        output_data = input_data * self.param1
        
        # Return new GraphData with results
        return GraphData(
            output_data,
            metadata={
                **gdata.metadata,
                'processed_with': self.param2
            }
        )

That's it! Use it in any graph:

graph = Graph("Custom")
graph.add(MyProcessor(param1=2.5, param2="advanced"), name="MyBlock")

Documentation

Key Concepts

GraphData

The standard data container that flows through operations. Has:

  • data: numpy array (the main signal/data)
  • metadata: dict of additional named ports (configs, timestamps, reference signals, etc.)

Ports

Named data fields in metadata. Operations declare which ports they use via:

  • Static analysis (AST inspection of function code)
  • Runtime detection (observing metadata access patterns)

This enables automatic optimization - ports flow directly from producer to consumer, bypassing intermediate operations.

Visualization

  • Solid lines (-->) show direct flow between adjacent operations
  • Dotted lines (-.->) show bypass connections where ports skip operations
  • Rectangles [Name] show regular operations
  • Hexagons {{Name}} show operations with parameter variants

Requirements

  • Python >= 3.7
  • numpy >= 1.20.0
  • scipy >= 1.7.0
  • matplotlib >= 3.3.0
  • plotly >= 5.0.0

License

MIT License - see LICENSE file for details.

Contributing

Contributions welcome! Please feel free to submit a Pull Request.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sigexec-2026.44.tar.gz (33.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

sigexec-2026.44-py3-none-any.whl (31.4 kB view details)

Uploaded Python 3

File details

Details for the file sigexec-2026.44.tar.gz.

File metadata

  • Download URL: sigexec-2026.44.tar.gz
  • Upload date:
  • Size: 33.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for sigexec-2026.44.tar.gz
Algorithm Hash digest
SHA256 6fead425ee2329e7a1a876e953bbf83aa21db7420b5b99e9f8ba79a54a8f30a9
MD5 e39b18ffb87bde911f28a52ecb5731a6
BLAKE2b-256 cf4ed79fbc11fb2a9fecdff928efcd5d4104c1fb3bd8b8885c34b0cb0096462c

See more details on using hashes here.

File details

Details for the file sigexec-2026.44-py3-none-any.whl.

File metadata

  • Download URL: sigexec-2026.44-py3-none-any.whl
  • Upload date:
  • Size: 31.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for sigexec-2026.44-py3-none-any.whl
Algorithm Hash digest
SHA256 07475a0601a71a34e210199a458be251773117e605aa2b4134b57015e4d5bad5
MD5 6e1f440d9ef777fecae9da3942f32077
BLAKE2b-256 91c9350874ce838be44787b6bf6fdba60ff7d6a48179d1a0497bc2b90eb32bcc

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page