A package for working with graphs representing minimal implicational logic models
Project description
Implica
A Python package for working with graphs representing minimal implicational logic models. This library provides tools for constructing and manipulating type systems based on combinatory logic, with support for transactional graph operations.
Import as imp for a clean, concise API!
Features
- 🎯 Type System: Build complex type expressions using variables and applications (function types)
- 🔍 Variable Extraction: Recursively extract all variables from any type expression
- 🧩 Combinators: Work with S and K combinators from combinatory logic
- 🔎 Type Pattern Matching: Match types against patterns with type variables (TypeSchema)
- 🎨 Schema-Based Queries: Filter nodes and edges by type structure using pattern matching
- 🔧 Schema-Based Mutations: Add, remove, and update elements matching type patterns
- 📊 Graph Structure: Represent type transformations as nodes and edges in a directed graph
- 🔄 Transactional Operations: Safely modify graphs with automatic rollback on failure
- 📦 Bulk Operations: Add multiple nodes or edges atomically with
add_many_nodesandadd_many_edges - 🏷️ Properties: Attach custom metadata dictionaries to nodes and edges for enriched graph semantics
- 🛡️ Idempotent Mutations: Use
try_add_nodeandtry_add_edgefor safe, duplicate-tolerant operations - ✅ Validation: Ensure graph consistency with built-in validation
- 🚀 Performance: Optimized data structures for O(1) lookups and efficient traversal
Installation
Using Poetry (recommended)
poetry add implica
Using pip
pip install implica
From source
git clone https://github.com/CarlosFerLo/implicational-logic-graph.git
cd implicational-logic-graph
poetry install
Quick Start
import implica as imp
# Create type variables
A = imp.var("A")
B = imp.var("B")
# Create a function type: A -> B
func_type = imp.app(A, B)
# Create a graph with nodes
graph = imp.Graph()
with graph.connect() as conn:
conn.add_node(imp.node(A))
conn.add_node(imp.node(B))
print(f"Graph has {graph.node_count()} nodes")
Core Concepts
Types
The library provides a type system for minimal implicational logic:
- Variable: Atomic type variables (e.g.,
A,B,C) - Application: Function types representing
input_type -> output_type
import implica as imp
# Simple type variables
A = imp.var("A")
B = imp.var("B")
C = imp.var("C")
# Function type: A -> B
simple_function = imp.app(A, B)
# Complex type: (A -> B) -> C
complex_function = imp.app(imp.app(A, B), C)
# Nested type: A -> (B -> C)
nested_function = imp.app(A, imp.app(B, C))
print(simple_function) # Output: A -> B
print(complex_function) # Output: (A -> B) -> C
print(nested_function) # Output: A -> B -> C
Parsing Types from Strings
You can also create types by parsing string representations using type_from_string():
import implica as imp
# Parse simple variables
A = imp.type_from_string("A")
print(A) # Output: A
# Parse function types
func = imp.type_from_string("A -> B")
print(func) # Output: A -> B
# Parse nested types with right-associativity
nested = imp.type_from_string("A -> B -> C")
print(nested) # Output: A -> B -> C
# This is equivalent to: A -> (B -> C)
# Parse with explicit parentheses
left_assoc = imp.type_from_string("(A -> B) -> C")
print(left_assoc) # Output: (A -> B) -> C
# Parse complex nested types
complex = imp.type_from_string("((A -> B) -> C) -> (D -> E)")
print(complex) # Output: ((A -> B) -> C) -> D -> E
# Multi-character variable names
person_func = imp.type_from_string("Person -> String")
print(person_func) # Output: Person -> String
# Variables with numbers and underscores
data_type = imp.type_from_string("data_1 -> result_2")
print(data_type) # Output: data_1 -> result_2
# Unicode characters are supported
greek = imp.type_from_string("α -> β -> γ")
print(greek) # Output: α -> β -> γ
Parsing rules:
- The arrow operator
->is right-associative:A -> B -> Cis parsed asA -> (B -> C) - Use parentheses to override associativity:
(A -> B) -> C - Variable names can contain letters, numbers, and underscores
- Whitespace is ignored:
A->BandA -> Bare equivalent - Unicode characters in variable names are supported
Error handling:
import implica as imp
# Empty string raises ValueError
try:
imp.type_from_string("")
except ValueError as e:
print(f"Error: {e}") # Error: Type string cannot be empty
# Mismatched parentheses
try:
imp.type_from_string("(A -> B")
except ValueError as e:
print(f"Error: {e}") # Error: Mismatched parentheses
# Invalid characters
try:
imp.type_from_string("A @ B")
except ValueError as e:
print(f"Error: {e}") # Error: Invalid character '@' at position 2
# Missing operand
try:
imp.type_from_string("A ->")
except ValueError as e:
print(f"Error: {e}") # Error: Unexpected end of input
Use cases:
- Parse type expressions from configuration files or user input
- Create types dynamically from strings
- Simplify type construction with readable syntax
- Testing and debugging with human-readable type representations
Extracting Variables from Types
Every type has a variables property that returns a list of all variables contained in that type. This is computed recursively and may include duplicate variables if they appear multiple times:
import implica as imp
# Simple variable returns itself
A = imp.var("A")
print(A.variables) # Output: [Variable(name='A')]
# Application returns all variables from both input and output
B = imp.var("B")
func = imp.app(A, B)
print([v.name for v in func.variables]) # Output: ['A', 'B']
# Nested types return all variables recursively
C = imp.var("C")
nested = imp.app(imp.app(A, B), C) # (A -> B) -> C
print([v.name for v in nested.variables]) # Output: ['A', 'B', 'C']
# Duplicates are included
same_var = imp.app(A, A) # A -> A
print([v.name for v in same_var.variables]) # Output: ['A', 'A']
# Complex example with duplicates
complex = imp.app(imp.app(A, B), A) # (A -> B) -> A
print([v.name for v in complex.variables]) # Output: ['A', 'B', 'A']
Use cases:
- Analyze which variables are used in a complex type expression
- Count occurrences of specific variables in a type
- Validate that certain variables are present or absent
- Generate variable lists for quantification or substitution operations
Combinators
Combinators represent transformations between types. The library includes the S and K combinators from combinatory logic.
Important: All combinators must have an Application type (function type), not a simple Variable. This ensures that combinators represent actual transformations from input types to output types. Attempting to create a combinator with a non-Application type will raise a ValidationError.
import implica as imp
A = imp.var("A")
B = imp.var("B")
C = imp.var("C")
# S combinator: (A -> B -> C) -> (A -> B) -> A -> C
s_comb = imp.S(A, B, C)
print(s_comb) # Output: S: (A -> B -> C) -> (A -> B) -> A -> C
# K combinator: A -> B -> A
k_comb = imp.K(A, B)
print(k_comb) # Output: K: A -> B -> A
# Custom combinator (must have Application type)
identity = imp.Combinator(name="I", type=imp.app(A, A))
print(identity) # Output: I: A -> A
# This will FAIL - cannot create combinator with Variable type:
try:
invalid_comb = imp.Combinator(name="X", type=A) # A is a Variable, not Application!
except ValidationError as e:
print(f"Error: {e}") # Error: type must be an Application
TypeSchema - Pattern Matching for Types
TypeSchema allows you to perform pattern matching on types using type variables, similar to unification in type systems. This is useful for:
- Checking if a type conforms to a specific pattern
- Extracting type components from complex types
- Validating combinator types against their expected signatures
- Generic type manipulation and analysis
Pattern Variables vs Exact Matching
TypeSchema supports two modes of variable matching:
- Pattern Variables (without prefix): Act as wildcards that can match any type
- Exact Match Variables (with
$prefix): Must match exactly the named type
import implica as imp
# Pattern variables act as wildcards
A = imp.var("A")
B = imp.var("B")
# Schema with pattern variable - matches ANY types
schema = imp.TypeSchema(pattern=imp.app(A, B))
result = schema.match(imp.app(imp.var("X"), imp.var("Y")))
# ✓ Matches! A binds to X, B binds to Y
# Exact match variables - require exact name match
exact_schema = imp.TypeSchema(pattern=imp.app(imp.var("$A"), imp.var("$B")))
result1 = exact_schema.match(imp.app(imp.var("A"), imp.var("B")))
# ✓ Matches! $A matches A exactly, $B matches B exactly
result2 = exact_schema.match(imp.app(imp.var("X"), imp.var("Y")))
# ✗ Fails! $A expects exactly "A", but got "X"
When to use each mode:
- Use pattern variables (no prefix) for generic pattern matching and extracting type components
- Use exact match variables (
$prefix) when you need to match specific named types - Mix both modes in the same pattern for flexible matching
# Mixed example: exact match for input, pattern variable for output
mixed = imp.TypeSchema(pattern=imp.app(imp.var("$Person"), imp.var("Y")))
result1 = mixed.match(imp.app(imp.var("Person"), imp.var("String")))
# ✓ Matches! $Person matches Person, Y binds to String
result2 = mixed.match(imp.app(imp.var("Employee"), imp.var("String")))
# ✗ Fails! $Person expects exactly "Person", not "Employee"
Basic Usage
import implica as imp
# Create a schema with pattern variables A and B
schema = imp.TypeSchema(pattern=imp.type_from_string("A -> B"))
# Try to match a concrete type
target = imp.type_from_string("X -> Y")
result = schema.match(target)
if result:
print(f"Match successful!")
print(f"A = {result.bindings['A']}") # A = X (pattern variable A matched X)
print(f"B = {result.bindings['B']}") # B = Y (pattern variable B matched Y)
# Using exact match
exact_schema = imp.TypeSchema(pattern=imp.app(imp.var("$Int"), imp.var("$String")))
result1 = exact_schema.match(imp.app(imp.var("Int"), imp.var("String")))
print(result1 is not None) # True - exact match!
result2 = exact_schema.match(imp.app(imp.var("Float"), imp.var("String")))
print(result2 is not None) # False - Int doesn't match Float
K Combinator Pattern Example
The K combinator has type A -> B -> A, where the same type variable A appears twice:
# Create a schema for the K pattern
k_schema = imp.schema(imp.type_from_string("A -> B -> A"))
# This fails - wrong structure (only 2 components instead of 3)
result1 = k_schema.match(imp.type_from_string("X -> Y"))
# result1 == None
# This succeeds - matches the pattern
result2 = k_schema.match(imp.type_from_string("(A -> B) -> C -> (A -> B)"))
# result2.bindings == {'A': (A -> B), 'B': C}
Consistency Checking
Type variables must match consistently throughout the pattern:
# Pattern A -> A requires both sides to be the same
id_schema = imp.schema(imp.type_from_string("A -> A"))
# Succeeds - both sides are X
result1 = id_schema.match(imp.type_from_string("X -> X"))
# result1.bindings == {'A': X}
# Fails - X and Y are different
result2 = id_schema.match(imp.type_from_string("X -> Y"))
# result2 == None
Variables Bind to Complex Types
Type variables can bind to any type, including complex ones:
schema = imp.schema(imp.type_from_string("A -> B"))
target = imp.type_from_string("(X -> Y) -> (Z -> W)")
result = schema.match(target)
# result.bindings == {'A': (X -> Y), 'B': (Z -> W)}
Validating Combinator Types
Check if a combinator matches its expected type pattern:
# Create a K combinator with specific types
k_combinator = imp.K(imp.var("Int"), imp.var("String"))
# k_combinator.type == Int -> String -> Int
# Check if it matches the general K pattern
k_pattern = imp.schema(imp.type_from_string("A -> B -> A"))
result = k_pattern.match(k_combinator.type)
if result:
print(f"K combinator is valid!")
print(f"A = {result.bindings['A']}") # A = Int
print(f"B = {result.bindings['B']}") # B = String
How TypeSchema Works
The matching algorithm implements type unification with two modes:
- Pattern Variable Matching (no prefix): When the pattern is a type variable without
$(e.g.,A,X), it acts as a wildcard that binds to any target type - Exact Matching (with
$prefix): When the pattern is a type variable with$(e.g.,$A,$Person), it requires the target to be a Variable with exactly that name (without the$) - Consistency Checking: If a pattern variable is already bound, new bindings must match the existing one
- Structural Matching: For application types (
->), both input and output must match recursively - Failure: Returns
Noneif any part of the match fails
Matching Examples:
# Pattern variable - matches anything
pattern_var = imp.TypeSchema(pattern=imp.var("X"))
pattern_var.match(imp.var("A")) # ✓ X binds to A
pattern_var.match(imp.var("B")) # ✓ X binds to B
pattern_var.match(imp.app(imp.var("A"), imp.var("B"))) # ✓ X binds to A -> B
# Exact match - requires specific name
exact_var = imp.TypeSchema(pattern=imp.var("$A"))
exact_var.match(imp.var("A")) # ✓ $A matches A exactly
exact_var.match(imp.var("B")) # ✗ $A expects A, got B
exact_var.match(imp.app(imp.var("A"), imp.var("B"))) # ✗ $A expects Variable, got Application
# Structural matching with exact constraints
schema = imp.TypeSchema(pattern=imp.app(imp.var("$Int"), imp.var("Y")))
schema.match(imp.app(imp.var("Int"), imp.var("String"))) # ✓ $Int matches Int, Y binds to String
schema.match(imp.app(imp.var("Float"), imp.var("String"))) # ✗ $Int expects Int, got Float
TypeSchema API:
TypeSchema(pattern: BaseType): Create a new type schema- Use variables without
$prefix for wildcard pattern matching - Use variables with
$prefix (e.g.,var("$A")) for exact name matching
- Use variables without
schema.match(target: BaseType) -> Optional[TypeMatch]: Try to match a type against the patternschema.get_pattern_variables() -> Set[str]: Get all variable names in the pattern (includes$prefix if present)TypeMatch: Result of a successful matchoriginal_type: BaseType: The type that was matchedbindings: Dict[str, BaseType]: Variable name → matched type (pattern variables only, not exact matches)
schema(pattern: BaseType) -> TypeSchema: Helper function to create a schema
Graph Elements
Nodes
Nodes represent types in the graph and can have optional properties (metadata):
import implica as imp
A = imp.var("A")
B = imp.var("B")
# Create nodes without properties
node_a = imp.node(A)
node_b = imp.node(B)
node_func = imp.node(imp.app(A, B))
print(node_a) # Output: Node(A)
print(node_func) # Output: Node(A -> B)
# Create nodes with properties
node_with_props = imp.node(A, properties={"color": "red", "weight": 1.5})
print(node_with_props.properties) # Output: {'color': 'red', 'weight': 1.5}
# Properties can be any JSON-serializable data
node_complex = imp.node(B, properties={
"metadata": {"author": "user1", "timestamp": 123456},
"tags": ["important", "test"],
"config": {"timeout": 30}
})
Properties Features:
- Optional: Nodes without properties have an empty dict
{} - Immutable: Properties are frozen with the node
- Flexible: Can store any JSON-serializable data
- Non-identifying: Two nodes with the same type have the same UID regardless of properties
Edges
Edges represent combinator transformations between types and can have optional properties. The combinator must have an Application type where the input matches the source node and the output matches the destination node.
import implica as imp
A = imp.var("A")
B = imp.var("B")
# Create nodes
n1 = imp.node(A)
n2 = imp.node(B)
# Create a combinator that transforms A to B (must be Application type!)
comb = imp.Combinator(name="f", type=imp.app(A, B))
# Create an edge without properties
e = imp.Edge(src_node=n1, dst_node=n2, combinator=comb)
print(e) # Output: A --[f: A -> B]--> B
# Create an edge with properties
e_with_props = imp.Edge(
src_node=n1,
dst_node=n2,
combinator=comb,
properties={"weight": 2.5, "label": "transition"}
)
print(e_with_props.properties) # Output: {'weight': 2.5, 'label': 'transition'}
# Alternative: use the edge helper function
# This automatically creates nodes from the combinator's input/output types
e2 = imp.edge(comb)
print(e2) # Output: A --[f: A -> B]--> B
# Edge helper with properties
e3 = imp.edge(comb, properties={"cost": 10})
print(e3.properties) # Output: {'cost': 10}
Properties Features:
- Optional: Edges without properties have an empty dict
{} - Immutable: Properties are frozen with the edge
- Flexible: Can store metrics, labels, costs, etc.
- Non-identifying: Two edges with the same structure have the same UID regardless of properties
Edge Validation: When creating an edge, the following validations are performed:
- The combinator must have an Application type (not a Variable)
- The combinator's input type must match the source node's type
- The combinator's output type must match the destination node's type
If any validation fails, a ValueError is raised.
Graph Operations
Creating and Modifying Graphs
import implica as imp
# Create an empty graph
graph = imp.Graph()
# Create types and nodes
A = imp.var("A")
B = imp.var("B")
C = imp.var("C")
n_a = imp.node(A)
n_b = imp.node(B)
n_c = imp.node(C)
# Use transactional connections to modify the graph
with graph.connect() as conn:
conn.add_node(n_a)
conn.add_node(n_b)
conn.add_node(n_c)
print(f"Nodes: {graph.node_count()}") # Output: Nodes: 3
# Add edges
comb_ab = imp.Combinator(name="f", type=imp.app(A, B))
comb_bc = imp.Combinator(name="g", type=imp.app(B, C))
with graph.connect() as conn:
conn.add_edge(comb_ab)
conn.add_edge(comb_bc)
print(f"Edges: {graph.edge_count()}") # Output: Edges: 2
Adding Nodes and Edges with Properties
You can add properties to nodes and edges directly through the connection API:
import implica as imp
graph = imp.Graph()
A = imp.var("A")
B = imp.var("B")
C = imp.var("C")
# Add nodes with properties using type
with graph.connect() as conn:
conn.add_node(type=A, properties={"color": "red", "weight": 1.5})
conn.add_node(type=B, properties={"color": "blue", "weight": 2.0})
conn.add_node(type=C) # Without properties
# Add edges with properties
comb = imp.Combinator(name="f", type=imp.app(A, B))
with graph.connect() as conn:
conn.add_edge(comb, properties={"cost": 10, "label": "primary"})
# Verify properties
node_a = graph.get_node(A.uid)
print(node_a.properties) # Output: {'color': 'red', 'weight': 1.5}
edges = list(graph.edges())
print(edges[0].properties) # Output: {'cost': 10, 'label': 'primary'}
Bulk Operations with Properties
When adding multiple nodes or edges, you can specify properties in two ways:
- Single dict: Applied to all elements
- List of dicts: One dict per element (must match count)
import implica as imp
graph = imp.Graph()
A, B, C = imp.var("A"), imp.var("B"), imp.var("C")
# Single properties dict for all nodes
with graph.connect() as conn:
conn.add_many_nodes(
types=[A, B, C],
properties={"category": "important"} # Applied to all
)
# Individual properties for each node
with graph.connect() as conn:
conn.add_many_nodes(
types=[A, B, C],
properties=[
{"color": "red"},
{"color": "blue"},
{"color": "green"}
]
)
# Same with edges
combs = [
imp.Combinator("f1", imp.app(A, B)),
imp.Combinator("f2", imp.app(B, C))
]
with graph.connect() as conn:
# Single weight for all edges
conn.add_many_edges(combs, properties={"weight": 1.0})
with graph.connect() as conn:
# Individual properties for each edge
conn.add_many_edges(
combs,
properties=[{"weight": 1.0}, {"weight": 2.0}]
)
# Validation: length must match
try:
with graph.connect() as conn:
conn.add_many_nodes(
types=[A, B],
properties=[{"x": 1}] # Only 1 dict for 2 types!
)
except ValueError as e:
print(f"Error: {e}") # Length mismatch error
Properties in Connection Methods:
All connection methods support properties:
add_node(type=..., properties={...})oradd_node(node=...)add_edge(combinator, properties={...})add_many_nodes(types=[...], properties=...)- Single dict or list of dictsadd_many_edges(combinators, properties=...)- Single dict or list of dictstry_add_node(type=..., properties={...})- Idempotent versiontry_add_edge(combinator, properties={...})- Idempotent versiontry_add_many_nodes(types=[...], properties=...)- Idempotent bulktry_add_many_edges(combinators, properties=...)- Idempotent bulk
Querying Graphs
# Get node by UID
node_retrieved = graph.get_node(n_a.uid)
# Get node by type
node_by_type = graph.get_node_by_type(A)
# Check if node exists
exists = graph.has_node(n_a.uid)
# Get outgoing edges from a node
outgoing = graph.get_outgoing_edges(n_a.uid)
print(f"Outgoing from A: {len(outgoing)}")
# Get incoming edges to a node
incoming = graph.get_incoming_edges(n_c.uid)
print(f"Incoming to C: {len(incoming)}")
# Iterate over all nodes
for n in graph.nodes():
print(f"Node: {n.type}")
# Iterate over all edges
for e in graph.edges():
print(f"Edge: {e}")
Graph Validation
# Validate graph consistency
try:
graph.validate()
print("Graph is valid!")
except ValueError as e:
print(f"Graph validation failed: {e}")
Advanced Mutations
The library provides several mutation types for flexible graph modifications:
Bulk Mutations
Add multiple nodes or edges in a single atomic operation:
import implica as imp
graph = imp.Graph()
# Create multiple nodes
nodes = [imp.node(imp.var(f"T{i}")) for i in range(5)]
# Add all nodes atomically - if any fails, all are rolled back
with graph.connect() as conn:
conn.add_many_nodes(nodes)
# Create multiple edges
A, B, C = imp.var("A"), imp.var("B"), imp.var("C")
combinators = [
imp.Combinator("f1", imp.app(A, B)),
imp.Combinator("f2", imp.app(B, C)),
]
# Add all edges atomically
with graph.connect() as conn:
conn.add_many_edges(combinators)
Benefits:
- All-or-nothing semantics: if any operation fails, all are rolled back
- More efficient than individual additions
- Cleaner code for batch operations
Removal Operations:
The same bulk operations are available for removing nodes and edges:
# Remove multiple nodes at once
node_uids = [n.uid for n in nodes[:3]]
with graph.connect() as conn:
conn.remove_many_nodes(node_uids)
# Remove multiple edges at once
edge_uids = [e.uid for e in some_edges]
with graph.connect() as conn:
conn.remove_many_edges(edge_uids)
Idempotent Mutations
Use safe mutations that don't fail when items already exist or don't exist:
import implica as imp
graph = imp.Graph()
A = imp.var("A")
n_a = imp.node(A)
# Regular add would fail on duplicate
with graph.connect() as conn:
conn.add_node(n_a)
# try_add_node won't fail if node exists
with graph.connect() as conn:
conn.try_add_node(n_a) # No error, even though n_a exists
# Same with edges
B = imp.var("B")
with graph.connect() as conn:
conn.try_add_node(imp.node(B))
comb = imp.Combinator("f", imp.app(A, B))
with graph.connect() as conn:
conn.try_add_edge(comb) # Adds the edge
with graph.connect() as conn:
conn.try_add_edge(comb) # No error, edge already exists
# Safe removal operations
with graph.connect() as conn:
conn.try_remove_node(n_a.uid) # Removes the node
with graph.connect() as conn:
conn.try_remove_node(n_a.uid) # No error, node doesn't exist anymore
# Same with edges
with graph.connect() as conn:
conn.try_remove_edge("some_edge_uid") # No error even if edge doesn't exist
Use Cases:
- Building graphs from multiple sources where duplicates may occur
- Idempotent initialization routines
- Avoiding explicit existence checks before adding or removing elements
- Incremental graph construction without duplicate errors
- Cleanup operations that should be safe to run multiple times
Usage Examples
Example 1: Simple Type Chain
Build a chain of type transformations:
import implica as imp
# Create graph
graph = imp.Graph()
# Define types
types = [imp.var(name) for name in ["A", "B", "C", "D"]]
nodes = [imp.node(t) for t in types]
# Add nodes
with graph.connect() as conn:
for n in nodes:
conn.add_node(n)
# Create transformation chain: A -> B -> C -> D
with graph.connect() as conn:
for i in range(len(nodes) - 1):
comb = imp.Combinator(
name=f"f{i}",
type=imp.app(types[i], types[i + 1])
)
conn.add_edge(imp.edge(nodes[i], nodes[i + 1], comb))
print(f"Created chain with {graph.node_count()} nodes and {graph.edge_count()} edges")
# Query the chain
a_node = graph.get_node_by_type(imp.var("A"))
outgoing = graph.get_outgoing_edges(a_node.uid)
print(f"A has {len(outgoing)} outgoing edge(s)")
Example 2: Complex Type Structure
Work with higher-order functions:
import implica as imp
# Create graph
graph = imp.Graph()
# Define type variables
A = imp.var("A")
B = imp.var("B")
C = imp.var("C")
# Create complex types
# Type 1: A
# Type 2: B
# Type 3: A -> B
# Type 4: (A -> B) -> C
t1 = A
t2 = B
t3 = imp.app(A, B)
t4 = imp.app(t3, C)
nodes = [imp.node(t) for t in [t1, t2, t3, t4]]
# Add nodes
with graph.connect() as conn:
for n in nodes:
conn.add_node(n)
# Add S and K combinators as edges
s_comb = imp.S(A, B, C)
k_comb = imp.K(A, B)
# Note: You would connect these based on your specific logic model
print(f"Created graph with {graph.node_count()} nodes")
print(f"S combinator type: {s_comb.type}")
print(f"K combinator type: {k_comb.type}")
Example 3: Transactional Rollback
Demonstrate automatic rollback on failure:
import implica as imp
graph = imp.Graph()
A = imp.var("A")
B = imp.var("B")
n_a = imp.node(A)
n_b = imp.node(B)
# Add initial nodes
with graph.connect() as conn:
conn.add_node(n_a)
conn.add_node(n_b)
print(f"Initial nodes: {graph.node_count()}") # Output: 2
# Try to add invalid edge (will fail and rollback)
try:
with graph.connect() as conn:
# This will succeed
C = imp.var("C")
n_c = imp.node(C)
conn.add_node(n_c)
# This will fail (trying to add duplicate node)
conn.add_node(n_a) # Already exists!
except RuntimeError as e:
print(f"Transaction failed: {e}")
# Graph remains unchanged
print(f"Nodes after failed transaction: {graph.node_count()}") # Output: 2
print(f"Has C: {graph.get_node_by_type(imp.var('C')) is not None}") # Output: False
Example 4: Building a Proof Tree
Create a structure representing logical derivations:
import implica as imp
# Create a graph representing a proof
graph = imp.Graph()
# Axioms (base types)
P = imp.var("P")
Q = imp.var("Q")
R = imp.var("R")
# Derived types (implications)
PQ = imp.app(P, Q) # P -> Q
QR = imp.app(Q, R) # Q -> R
PR = imp.app(P, R) # P -> R (conclusion)
# Create nodes for each type in the proof
nodes_dict = {
"P": imp.node(P),
"Q": imp.node(Q),
"R": imp.node(R),
"P->Q": imp.node(PQ),
"Q->R": imp.node(QR),
"P->R": imp.node(PR),
}
# Add all nodes
with graph.connect() as conn:
for n in nodes_dict.values():
conn.add_node(n)
# Add inference rules as edges
with graph.connect() as conn:
# Modus Ponens: P, P->Q ⊢ Q
mp1 = imp.Combinator(name="MP1", type=imp.app(P, Q))
conn.add_edge(imp.edge(nodes_dict["P"], nodes_dict["Q"], mp1))
# Modus Ponens: Q, Q->R ⊢ R
mp2 = imp.Combinator(name="MP2", type=imp.app(Q, R))
conn.add_edge(imp.edge(nodes_dict["Q"], nodes_dict["R"], mp2))
# Composition: P->Q, Q->R ⊢ P->R
comp = imp.Combinator(name="Comp", type=imp.app(P, R))
conn.add_edge(imp.edge(nodes_dict["P"], nodes_dict["R"], comp))
print(f"Proof tree has {graph.node_count()} types")
print(f"Proof tree has {graph.edge_count()} inference rules")
# Validate the proof structure
graph.validate()
print("Proof structure is valid!")
Example 5: Exploring Graph Structure
Navigate and analyze the graph:
import implica as imp
# Build a diamond-shaped graph
# A
# / \
# B C
# \ /
# D
graph = imp.Graph()
A, B, C, D = imp.var("A"), imp.var("B"), imp.var("C"), imp.var("D")
n_a, n_b, n_c, n_d = imp.node(A), imp.node(B), imp.node(C), imp.node(D)
with graph.connect() as conn:
conn.add_node(n_a)
conn.add_node(n_b)
conn.add_node(n_c)
conn.add_node(n_d)
# A -> B, A -> C
conn.add_edge(imp.edge(n_a, n_b, imp.Combinator("f1", imp.app(A, B))))
conn.add_edge(imp.edge(n_a, n_c, imp.Combinator("f2", imp.app(A, C))))
# B -> D, C -> D
conn.add_edge(imp.edge(n_b, n_d, imp.Combinator("g1", imp.app(B, D))))
conn.add_edge(imp.edge(n_c, n_d, imp.Combinator("g2", imp.app(C, D))))
# Analyze the structure
print("=== Graph Analysis ===")
print(f"Total nodes: {graph.node_count()}")
print(f"Total edges: {graph.edge_count()}")
# Find all paths from A
print("\nFrom A:")
for e in graph.get_outgoing_edges(n_a.uid):
print(f" -> {e.dst_node.type} via {e.combinator.name}")
# Find all paths to D
print("\nTo D:")
for e in graph.get_incoming_edges(n_d.uid):
print(f" <- {e.src_node.type} via {e.combinator.name}")
# Check connectivity
print("\nNode connectivity:")
for n in graph.nodes():
incoming = len(graph.get_incoming_edges(n.uid))
outgoing = len(graph.get_outgoing_edges(n.uid))
print(f" {n.type}: {incoming} in, {outgoing} out")
Example 6: Bulk Operations
Add multiple nodes and edges efficiently:
import implica as imp
graph = imp.Graph()
# Create many type variables at once
type_vars = [imp.var(f"T{i}") for i in range(10)]
nodes = [imp.node(t) for t in type_vars]
# Add all nodes in a single transaction
with graph.connect() as conn:
conn.add_many_nodes(nodes)
print(f"Added {graph.node_count()} nodes in one operation")
# Create a chain of transformations
combinators = []
for i in range(len(type_vars) - 1):
comb = imp.Combinator(
name=f"f{i}",
type=imp.app(type_vars[i], type_vars[i + 1])
)
combinators.append(comb)
# Add all edges in a single transaction
with graph.connect() as conn:
conn.add_many_edges(combinators)
print(f"Added {graph.edge_count()} edges in one operation")
# If any node or edge fails, all are rolled back atomically
try:
with graph.connect() as conn:
# This will fail because nodes[0] already exists
conn.add_many_nodes([nodes[0], imp.node(imp.var("NEW"))])
except RuntimeError:
print("Transaction rolled back - no new nodes added")
Example 7: Idempotent Operations
Use safe mutations that don't fail on duplicates:
import implica as imp
graph = imp.Graph()
A = imp.var("A")
B = imp.var("B")
n_a = imp.node(A)
n_b = imp.node(B)
# Add nodes normally
with graph.connect() as conn:
conn.add_node(n_a)
conn.add_node(n_b)
print(f"Initial nodes: {graph.node_count()}") # Output: 2
# Try to add nodes again - won't fail!
with graph.connect() as conn:
conn.try_add_node(n_a) # Already exists, but no error
conn.try_add_node(n_b) # Already exists, but no error
conn.try_add_node(imp.node(imp.var("C"))) # New node, will be added
print(f"Nodes after try_add: {graph.node_count()}") # Output: 3
# Same with edges
comb_ab = imp.Combinator("f", imp.app(A, B))
with graph.connect() as conn:
conn.try_add_edge(comb_ab) # Will be added
print(f"Edges: {graph.edge_count()}") # Output: 1
with graph.connect() as conn:
conn.try_add_edge(comb_ab) # Already exists, but no error
print(f"Edges after try_add: {graph.edge_count()}") # Output: 1
# Useful for idempotent operations and avoiding duplicate checks
def ensure_basic_types(graph, type_names):
"""Ensure all basic types exist in the graph."""
with graph.connect() as conn:
for name in type_names:
conn.try_add_node(imp.node(imp.var(name)))
# Can call this multiple times safely
ensure_basic_types(graph, ["A", "B", "C", "D"])
ensure_basic_types(graph, ["C", "D", "E", "F"]) # C and D won't cause errors
print(f"Final nodes: {graph.node_count()}") # Output: 6 (A, B, C, D, E, F)
Example 8: Bulk Removal Operations
Remove multiple elements efficiently:
import implica as imp
graph = imp.Graph()
# Build a graph with multiple nodes and edges
types = [imp.var(f"T{i}") for i in range(10)]
nodes = [imp.node(t) for t in types]
with graph.connect() as conn:
conn.add_many_nodes(nodes)
# Create edges
combinators = []
for i in range(len(types) - 1):
comb = imp.Combinator(f"f{i}", imp.app(types[i], types[i + 1]))
combinators.append(comb)
with graph.connect() as conn:
conn.add_many_edges(combinators)
print(f"Initial: {graph.node_count()} nodes, {graph.edge_count()} edges")
# Output: Initial: 10 nodes, 9 edges
# Remove multiple edges at once
edge_uids = [graph.get_outgoing_edges(nodes[i].uid)[0].uid for i in range(3)]
with graph.connect() as conn:
conn.remove_many_edges(edge_uids)
print(f"After edge removal: {graph.edge_count()} edges")
# Output: After edge removal: 6 edges
# Remove multiple nodes at once (and their connected edges)
node_uids = [nodes[i].uid for i in range(5)]
with graph.connect() as conn:
conn.remove_many_nodes(node_uids)
print(f"After node removal: {graph.node_count()} nodes, {graph.edge_count()} edges")
# Output: After node removal: 5 nodes, 0 edges (edges were connected to removed nodes)
# Safe removal with try_remove (won't fail if already removed)
with graph.connect() as conn:
conn.try_remove_node(nodes[0].uid) # Already removed, no error
conn.try_remove_edge("nonexistent_uid") # Doesn't exist, no error
print(f"Final: {graph.node_count()} nodes") # Output: Final: 5 nodes
Example 9: Safe Cleanup Operations
Use idempotent removal for cleanup tasks:
import implica as imp
def cleanup_temporary_nodes(graph, temp_node_uids):
"""
Remove temporary nodes if they exist.
This function is safe to call multiple times.
"""
with graph.connect() as conn:
for uid in temp_node_uids:
conn.try_remove_node(uid)
graph = imp.Graph()
# Add some nodes
A, B, C = imp.var("A"), imp.var("B"), imp.var("C")
n_a, n_b, n_c = imp.node(A), imp.node(B), imp.node(C)
with graph.connect() as conn:
conn.add_many_nodes([n_a, n_b, n_c])
print(f"Initial nodes: {graph.node_count()}") # Output: 3
# Clean up - safe to call multiple times
temp_uids = [n_a.uid, n_b.uid]
cleanup_temporary_nodes(graph, temp_uids)
print(f"After cleanup: {graph.node_count()}") # Output: 1
# Call again - won't fail even though nodes are already removed
cleanup_temporary_nodes(graph, temp_uids)
print(f"After second cleanup: {graph.node_count()}") # Output: 1
# Combine try_remove with try_add for flexible graph updates
def ensure_graph_state(graph, required_nodes, forbidden_nodes):
"""
Ensure graph has required nodes and doesn't have forbidden ones.
"""
with graph.connect() as conn:
# Add required nodes (idempotent)
for n in required_nodes:
conn.try_add_node(n)
# Remove forbidden nodes (idempotent)
for uid in forbidden_nodes:
conn.try_remove_node(uid)
# Can call this function repeatedly to maintain desired state
ensure_graph_state(
graph,
required_nodes=[imp.node(imp.var("X")), imp.node(imp.var("Y"))],
forbidden_nodes=[n_c.uid]
)
print(f"Final state: {graph.node_count()} nodes") # Output: 2 (X and Y)
Example 10: Using Schema-Based API for Pattern Matching
Use TypeSchemas to query and manipulate the graph based on type patterns:
import implica as imp
# Create a graph with various types
graph = imp.Graph()
# Add variable and application nodes
A = imp.var("A")
B = imp.var("B")
C = imp.var("C")
AB = imp.app(A, B)
BC = imp.app(B, C)
ABC = imp.app(AB, C)
with graph.connect() as conn:
conn.add_many_nodes(
types=[A, B, C, AB, BC, ABC],
properties=[
{"kind": "variable", "priority": 1},
{"kind": "variable", "priority": 1},
{"kind": "variable", "priority": 1},
{"kind": "application", "priority": 2},
{"kind": "application", "priority": 2},
{"kind": "application", "priority": 3},
]
)
# Create a schema to match application types (X -> Y)
app_schema = imp.TypeSchema(pattern=imp.app(imp.var("X"), imp.var("Y")))
# Query: Get all application nodes with their bindings
matches = graph.get_nodes_matching_schema(app_schema)
print(f"\nApplication nodes found: {len(matches)}")
for node, match in matches:
print(f" {node.type}: X={match.bindings['X']}, Y={match.bindings['Y']}")
# Output:
# Application nodes found: 3
# A -> B: X=A, Y=B
# B -> C: X=B, Y=C
# (A -> B) -> C: X=(A -> B), Y=C
# Filter: Get only high-priority application nodes
high_priority_apps = graph.filter_nodes_by_schema(
app_schema,
properties={"priority": 3}
)
print(f"\nHigh priority applications: {len(high_priority_apps)}")
# Output: High priority applications: 1 (the nested one)
# Count application nodes
app_count = graph.count_nodes_matching_schema(app_schema)
print(f"Total applications: {app_count}") # Output: 3
# Mutation: Update all application nodes
with graph.connect() as conn:
conn.update_nodes_matching_schema(
app_schema,
{"processed": True, "timestamp": 123456}
)
# Verify updates
for node in graph.filter_nodes_by_schema(app_schema):
print(f"{node.type}: processed={node.properties.get('processed')}")
# Mutation: Add only application types from a list
new_types = [
imp.var("D"),
imp.app(imp.var("D"), imp.var("E")),
imp.var("F"),
]
with graph.connect() as conn:
# Only the application will be added
conn.add_nodes_for_schema(app_schema, new_types, properties={"new": True})
print(f"Nodes after selective add: {graph.node_count()}")
# Output: 7 (6 original + 1 new application)
# Mutation: Remove temporary application nodes
with graph.connect() as conn:
conn.try_remove_nodes_matching_schema(
app_schema,
properties={"new": True}
)
print(f"Nodes after cleanup: {graph.node_count()}")
# Output: 6 (back to original)
# Edge filtering with schemas
D = imp.var("D")
E = imp.var("E")
with graph.connect() as conn:
conn.add_node(type=D)
conn.add_node(type=E)
conn.add_edge(imp.Combinator("f1", imp.app(A, B)))
conn.add_edge(imp.Combinator("f2", imp.app(AB, C)))
conn.add_edge(imp.Combinator("f3", imp.app(D, E)))
# Find edges where source is an application
edges_from_apps = graph.filter_edges_by_schema(src_schema=app_schema)
print(f"\nEdges from applications: {len(edges_from_apps)}")
# Output: 1 (the edge from AB to C)
# Find edges where destination is a variable
var_schema = imp.TypeSchema(pattern=imp.var("Z"))
edges_to_vars = graph.filter_edges_by_schema(dst_schema=var_schema)
print(f"Edges to variables: {len(edges_to_vars)}")
# Output: 2 (A->B and D->E both go to variables)
# Complex pattern: nested applications like (X -> Y) -> Z
nested_schema = imp.TypeSchema(
pattern=imp.app(imp.app(imp.var("X"), imp.var("Y")), imp.var("Z"))
)
nested_matches = graph.get_nodes_matching_schema(nested_schema)
print(f"\nNested applications: {len(nested_matches)}")
for node, match in nested_matches:
print(f" {node.type}")
print(f" X={match.bindings['X']}, Y={match.bindings['Y']}, Z={match.bindings['Z']}")
# Output:
# Nested applications: 1
# (A -> B) -> C
# X=A, Y=B, Z=C
Schema-Based API Benefits:
- Pattern Matching: Find types by their structure, not just exact matches
- Variable Bindings: Extract components from complex types
- Bulk Operations: Add, remove, or update multiple nodes/edges matching a pattern
- Property Filtering: Combine pattern matching with property constraints
- Transactional Safety: All mutations are transactional with automatic rollback
Important Note: A single variable pattern like var("X") matches ALL types (both Variables and Applications) because TypeSchema uses unification. To filter only application types, use an application pattern like app(var("X"), var("Y")).
Example 11: Analyzing Type Variables
Extract and analyze variables from complex type expressions:
import implica as imp
# Create a complex type expression
A = imp.var("A")
B = imp.var("B")
C = imp.var("C")
# ((A -> B) -> C) -> (A -> B)
inner = imp.app(A, B)
middle = imp.app(inner, C)
complex_type = imp.app(middle, inner)
print(f"Type: {complex_type}")
# Output: ((A -> B) -> C) -> A -> B
# Get all variables (including duplicates)
variables = complex_type.variables
print(f"All variables: {[v.name for v in variables]}")
# Output: ['A', 'B', 'C', 'A', 'B']
# Count unique variables
unique_vars = {v.name for v in variables}
print(f"Unique variables: {unique_vars}")
# Output: {'A', 'B', 'C'}
# Count occurrences
from collections import Counter
var_counts = Counter(v.name for v in variables)
print(f"Variable counts: {dict(var_counts)}")
# Output: {'A': 2, 'B': 2, 'C': 1}
# Check if a specific variable is used
def uses_variable(type_expr, var_name):
"""Check if a type expression uses a specific variable."""
return any(v.name == var_name for v in type_expr.variables)
print(f"Uses A: {uses_variable(complex_type, 'A')}") # Output: True
print(f"Uses D: {uses_variable(complex_type, 'D')}") # Output: False
# Find all types in a graph that use a specific variable
graph = imp.Graph()
# Create various types
types_to_add = [
imp.var("X"),
imp.var("Y"),
imp.app(A, B),
imp.app(B, C),
imp.app(A, imp.app(B, C)),
]
with graph.connect() as conn:
for t in types_to_add:
conn.add_node(imp.node(t))
# Find all nodes containing variable "B"
nodes_with_B = [
n for n in graph.nodes()
if any(v.name == "B" for v in n.type.variables)
]
print(f"\nNodes containing variable B:")
for n in nodes_with_B:
print(f" - {n.type}")
# Output:
# - A -> B
# - B -> C
# - A -> B -> C
# Calculate the "complexity" of a type by counting its variables
def type_complexity(type_expr):
"""Calculate complexity as the total number of variables."""
return len(type_expr.variables)
print(f"\nType complexities:")
for n in graph.nodes():
print(f" {n.type}: {type_complexity(n.type)}")
# Output:
# X: 1
# Y: 1
# A -> B: 2
# B -> C: 2
# A -> B -> C: 3
Example 12: Using type_from_string for Dynamic Type Creation
Parse types from strings for flexible, dynamic type construction:
import implica as imp
# Create a graph
graph = imp.Graph()
# Define type expressions as strings (e.g., from a config file or user input)
type_strings = [
"Person",
"String",
"Int",
"Person -> String", # Get name
"Person -> Int", # Get age
"(Person -> String) -> (Person -> Int) -> Person", # Complex transformation
]
# Parse and add all types to the graph
nodes = []
for type_str in type_strings:
parsed_type = imp.type_from_string(type_str)
node = imp.node(parsed_type)
nodes.append(node)
with graph.connect() as conn:
conn.add_many_nodes(nodes)
print(f"Created graph with {graph.node_count()} nodes from string definitions")
# Output: Created graph with 6 nodes from string definitions
# You can also build a type library from a configuration
type_library = {
"identity": "A -> A",
"const": "A -> B -> A",
"compose": "(B -> C) -> (A -> B) -> A -> C",
"apply": "(A -> B) -> A -> B",
}
print("\n=== Type Library ===")
for name, type_str in type_library.items():
parsed = imp.type_from_string(type_str)
print(f"{name:10} : {parsed}")
# Output:
# identity : A -> A
# const : A -> B -> A
# compose : (B -> C) -> (A -> B) -> A -> C
# apply : (A -> B) -> A -> B
# Validate type strings from user input
def validate_and_parse_type(user_input: str) -> tuple[bool, str, object]:
"""
Validate and parse a type string from user input.
Returns (is_valid, message, parsed_type)
"""
try:
parsed = imp.type_from_string(user_input)
return True, f"Valid type: {parsed}", parsed
except ValueError as e:
return False, f"Invalid type: {e}", None
# Test validation
test_inputs = [
"A -> B",
"(A -> B) -> C",
"A ->", # Invalid
"Person -> (Address -> String)",
"A @ B", # Invalid
]
print("\n=== Type Validation ===")
for test in test_inputs:
is_valid, message, parsed = validate_and_parse_type(test)
status = "✓" if is_valid else "✗"
print(f"{status} '{test:30}' : {message}")
# Output:
# ✓ 'A -> B' : Valid type: A -> B
# ✓ '(A -> B) -> C' : Valid type: (A -> B) -> C
# ✗ 'A ->' : Invalid type: Unexpected end of input
# ✓ 'Person -> (Address -> String)' : Valid type: Person -> Address -> String
# ✗ 'A @ B' : Invalid type: Invalid character '@' at position 2
# Build a type inference system
def infer_result_type(func_type_str: str, input_type_str: str) -> str:
"""
Given a function type and an input type, infer the result type.
Returns the result type as a string, or an error message.
"""
try:
func_type = imp.type_from_string(func_type_str)
input_type = imp.type_from_string(input_type_str)
# Check if func_type is an application
if not isinstance(func_type, imp.Application):
return f"Error: {func_type_str} is not a function type"
# Check if input matches function's input type
if func_type.input_type.uid != input_type.uid:
return f"Error: Type mismatch. Expected {func_type.input_type}, got {input_type}"
return str(func_type.output_type)
except ValueError as e:
return f"Error: {e}"
# Test type inference
print("\n=== Type Inference ===")
print(f"Apply 'A -> B' to 'A': {infer_result_type('A -> B', 'A')}")
# Output: B
print(f"Apply 'Person -> String' to 'Person': {infer_result_type('Person -> String', 'Person')}")
# Output: String
print(f"Apply 'A -> B -> C' to 'A': {infer_result_type('A -> B -> C', 'A')}")
# Output: B -> C
print(f"Apply 'A -> B' to 'C': {infer_result_type('A -> B', 'C')}")
# Output: Error: Type mismatch. Expected A, got C
# Parse types from a domain-specific language
dsl_program = """
define Input
define Output
define Processor = Input -> Output
define Chain = Processor -> Processor -> Processor
"""
print("\n=== Parsing DSL ===")
for line in dsl_program.strip().split('\n'):
line = line.strip()
if line.startswith("define "):
parts = line[7:].split(" = ")
type_name = parts[0].strip()
if len(parts) == 1:
# Simple type definition
print(f"{type_name}: <primitive type>")
else:
# Complex type definition
type_expr = parts[1].strip()
parsed = imp.type_from_string(type_expr)
print(f"{type_name}: {parsed}")
# Output:
# Input: <primitive type>
# Output: <primitive type>
# Processor: Input -> Output
# Chain: Processor -> Processor -> Processor
Schema-Based Graph API
This section explains how to use the TypeSchema-based API for querying and manipulating the implicational logic graph. The schema-based API provides powerful pattern matching capabilities for working with nodes and edges based on their type structure.
Understanding TypeSchemas
TypeSchemas provide pattern matching for types using unification. A schema contains a pattern with type variables that can be matched against concrete types.
Key Concepts
- Pattern Variables (no prefix): A variable like
XorYacts as a wildcard that matches any type (Variable or Application) - Exact Match Variables (with
$prefix): A variable like$Aor$Personmatches only the specific named type - Structural Matching: A pattern like
X -> Ymatches any Application type with inputXand outputY - Variable Bindings: When a match succeeds, pattern variables are bound to the matched types
Pattern Matching Modes
import implica as imp
# Pattern variable - matches ALL types
any_schema = imp.TypeSchema(pattern=imp.var("X"))
# This will match Variables, Applications, everything
# Exact match - matches only specific type
exact_schema = imp.TypeSchema(pattern=imp.var("$A"))
# This will match ONLY Variable with name "A"
# Application pattern - matches only Application types
app_schema = imp.TypeSchema(pattern=imp.app(imp.var("X"), imp.var("Y")))
# This will match only Application types (function types)
# Mixed pattern - exact source, pattern destination
mixed_schema = imp.TypeSchema(pattern=imp.app(imp.var("$Person"), imp.var("Y")))
# This matches Applications where input is exactly "Person", output is anything
Use cases for exact matching ($ prefix):
- Filter nodes by specific named types:
filter_nodes_by_schema(TypeSchema(var("$Int"))) - Find edges from/to specific types:
filter_edges_by_schema(src_schema=TypeSchema(var("$String"))) - Remove or update specific named types while preserving others
- Combine exact and pattern matching for precise queries
Schema Query Operations
Get Nodes Matching Schema
Retrieve all nodes whose types match a schema pattern, along with variable bindings:
import implica as imp
g = imp.Graph()
# Add nodes
a = imp.var("A")
b = imp.var("B")
ab = imp.app(a, b)
with g.connect() as conn:
conn.add_many_nodes(types=[a, b, ab])
# Match all application types (X -> Y)
app_schema = imp.TypeSchema(pattern=imp.app(imp.var("X"), imp.var("Y")))
matches = g.get_nodes_matching_schema(app_schema)
for node, match in matches:
print(f"Node: {node.type}")
print(f"Bindings: {match.bindings}")
# Bindings will show what X and Y matched
Filter Nodes by Schema
Filter nodes by schema and optionally by properties:
# Filter application types with specific properties
app_schema = imp.TypeSchema(pattern=imp.app(imp.var("X"), imp.var("Y")))
nodes = g.filter_nodes_by_schema(app_schema, properties={"weight": 1.0})
# All returned nodes will have application types AND weight=1.0
Filter Edges by Schema
Filter edges by matching source node type, destination node type, and/or combinator type:
# Filter edges where source is an application
src_schema = imp.TypeSchema(pattern=imp.app(imp.var("X"), imp.var("Y")))
edges = g.filter_edges_by_schema(src_schema=src_schema)
# Filter edges with source application AND destination variable
dst_schema = imp.TypeSchema(pattern=imp.var("Z"))
edges = g.filter_edges_by_schema(
src_schema=src_schema,
dst_schema=dst_schema
)
# Add property filtering
edges = g.filter_edges_by_schema(
src_schema=src_schema,
properties={"label": "main"}
)
Count and Check Existence
# Count nodes matching schema
app_schema = imp.TypeSchema(pattern=imp.app(imp.var("X"), imp.var("Y")))
count = g.count_nodes_matching_schema(app_schema)
# Check if any node matches
has_apps = g.has_node_matching_schema(app_schema)
# Count edges matching schema
count = g.count_edges_matching_schema(src_schema=app_schema)
# Check if any edge matches
has_edges = g.has_edge_matching_schema(dst_schema=app_schema)
Schema Mutation Operations
All mutation operations use the Connection API for transactional safety. Schema-based mutations query the graph at mutation-building time.
Add Nodes for Schema
Add only those nodes from a list whose types match a schema:
import implica as imp
g = imp.Graph()
# List of types to potentially add
types = [imp.var("A"), imp.var("B"), imp.app(imp.var("A"), imp.var("B"))]
# Only add application types
app_schema = imp.TypeSchema(pattern=imp.app(imp.var("X"), imp.var("Y")))
with g.connect() as conn:
conn.add_nodes_for_schema(app_schema, types)
# Result: Only the Application node is added
With properties:
with g.connect() as conn:
conn.add_nodes_for_schema(
app_schema,
types,
properties={"source": "generated"}
)
Remove Nodes Matching Schema
Remove all nodes in the graph whose types match a schema:
# Remove all application nodes
app_schema = imp.TypeSchema(pattern=imp.app(imp.var("X"), imp.var("Y")))
with g.connect() as conn:
conn.remove_nodes_matching_schema(app_schema)
# With property filter
with g.connect() as conn:
conn.remove_nodes_matching_schema(
app_schema,
properties={"temporary": True}
)
Remove Edges Matching Schema
Remove all edges matching schema patterns:
# Remove edges where source is an application
app_schema = imp.TypeSchema(pattern=imp.app(imp.var("X"), imp.var("Y")))
with g.connect() as conn:
conn.remove_edges_matching_schema(src_schema=app_schema)
# Remove edges with specific source AND destination patterns
with g.connect() as conn:
conn.remove_edges_matching_schema(
src_schema=app_schema,
dst_schema=imp.TypeSchema(pattern=imp.var("Z"))
)
Update Nodes/Edges Matching Schema
Update properties of all matching nodes or edges:
# Update all application nodes
app_schema = imp.TypeSchema(pattern=imp.app(imp.var("X"), imp.var("Y")))
with g.connect() as conn:
conn.update_nodes_matching_schema(
app_schema,
{"processed": True, "depth": 1}
)
# Update with filter
with g.connect() as conn:
conn.update_nodes_matching_schema(
app_schema,
{"status": "done"},
filter_properties={"status": "pending"}
)
# Update edges
with g.connect() as conn:
conn.update_edges_matching_schema(
{"weight": 2.0},
src_schema=app_schema
)
Try Variants
All mutation operations have try_* variants that don't fail if elements don't exist:
with g.connect() as conn:
# Won't fail if nodes don't exist
conn.try_remove_nodes_matching_schema(app_schema)
# Won't fail if edges don't exist
conn.try_remove_edges_matching_schema(src_schema=app_schema)
# Won't fail for non-existent nodes
conn.try_update_nodes_matching_schema(app_schema, {"value": 1})
Schema API Best Practices
1. Schema Pattern Design
import implica as imp
# ❌ Don't use pattern variable alone to filter specific Variable types
# Pattern variables match ALL types, not just Variables
var_schema = imp.TypeSchema(pattern=imp.var("X"))
nodes = g.filter_nodes_by_schema(var_schema) # Returns ALL nodes (Variables AND Applications)
# ✓ Use exact match to filter specific named Variables
exact_schema = imp.TypeSchema(pattern=imp.var("$Int"))
int_nodes = g.filter_nodes_by_schema(exact_schema) # Returns only Variable nodes named "Int"
# ✓ Use application pattern to filter Applications
app_schema = imp.TypeSchema(pattern=imp.app(imp.var("X"), imp.var("Y")))
apps = g.filter_nodes_by_schema(app_schema) # Returns only Application nodes
# ✓ Mix exact and pattern matching for precise control
mixed_schema = imp.TypeSchema(pattern=imp.app(imp.var("$Person"), imp.var("Y")))
person_funcs = g.filter_nodes_by_schema(mixed_schema) # Returns only "Person -> Y" Applications
# ✓ Use specific patterns for more complex matching
nested_schema = imp.TypeSchema(pattern=imp.app(imp.app(imp.var("X"), imp.var("Y")), imp.var("Z")))
nested = g.filter_nodes_by_schema(nested_schema) # Returns (X -> Y) -> Z types
Pattern Selection Guide:
| Goal | Pattern | Example |
|---|---|---|
| Match specific Variable | var("$Name") |
var("$Int") matches only "Int" |
| Match any Variable | Use exact match for each name | Iterate over known names |
| Match any Application | app(var("X"), var("Y")) |
Matches all function types |
| Match specific structure | Exact types in pattern | app(var("$A"), var("$B")) |
| Extract components | Pattern variables | app(var("X"), var("Y")) with bindings |
2. Combine Schema and Property Filters
# Use both schema and properties for precise filtering
app_schema = imp.TypeSchema(pattern=imp.app(imp.var("X"), imp.var("Y")))
nodes = g.filter_nodes_by_schema(
app_schema,
properties={"processed": False, "priority": "high"}
)
3. Use Transactions for Safety
# Always use context manager for mutations
with g.connect() as conn:
conn.remove_nodes_matching_schema(schema)
conn.add_nodes_for_schema(other_schema, new_types)
# Automatic rollback on error
4. Check Before Bulk Operations
# Check count before removing
app_schema = imp.TypeSchema(pattern=imp.app(imp.var("X"), imp.var("Y")))
count = g.count_nodes_matching_schema(app_schema)
print(f"Will remove {count} nodes")
if count > 0:
with g.connect() as conn:
conn.remove_nodes_matching_schema(app_schema)
5. Use Try Variants for Idempotent Operations
# Use try_* for operations that might not find matches
with g.connect() as conn:
# Safe even if no matches
conn.try_remove_nodes_matching_schema(schema)
conn.try_update_edges_matching_schema(props, src_schema=schema)
6. Leverage Variable Bindings
# Access variable bindings from matches
app_schema = imp.TypeSchema(pattern=imp.app(imp.var("X"), imp.var("Y")))
matches = g.get_nodes_matching_schema(app_schema)
for node, match in matches:
input_type = match.bindings["X"]
output_type = match.bindings["Y"]
print(f"Edge from {input_type} to {output_type}")
Schema API Summary
The schema-based API provides a powerful and expressive way to query and manipulate graphs based on type patterns. By combining schema matching with property filtering, you can perform complex operations on graph structures efficiently and safely.
Key takeaways:
- Schemas use unification, not type discrimination
- Pattern variables (no prefix) match all types - use for generic matching
- Exact match variables (
$prefix) match specific named types - use for precise filtering - Single pattern variable patterns match all types (both Variables and Applications)
- Use application patterns
app(var("X"), var("Y"))to filter only Application types - Use exact match
var("$TypeName")to filter only a specific Variable type - Combine exact and pattern matching for flexible queries
- Combine schemas with property filters for precise control
- Always use transactions for mutations
- Use
try_*variants for robust error handling
Quick Reference:
# Match everything
TypeSchema(var("X")) # Wildcard - matches all types
# Match specific Variable
TypeSchema(var("$Int")) # Exact - matches only "Int" Variable
# Match all Applications
TypeSchema(app(var("X"), var("Y"))) # Structural pattern
# Match specific structure
TypeSchema(app(var("$A"), var("$B"))) # Exact input and output
# Mixed matching
TypeSchema(app(var("$Person"), var("Y"))) # Exact input, any output
API Reference
Core Module (implica.core)
Types:
var(name: str) -> Variable: Create a type variableapp(input_type: BaseType, output_type: BaseType) -> Application: Create a function typetype_from_string(type_str: str) -> BaseType: Parse a type from a string representationVariable: Atomic type variablename: str: The name of the variableuid: str: Unique identifier (SHA256 hash)variables: list[Variable]: Returns[self]
Application: Function application typeinput_type: BaseType: Input type of the functionoutput_type: BaseType: Output type of the functionuid: str: Unique identifier (SHA256 hash)variables: list[Variable]: Returns all variables from input and output types (may include duplicates)
Combinators:
S(A, B, C) -> Combinator: Create S combinator with type(A -> B -> C) -> (A -> B) -> A -> CK(A, B) -> Combinator: Create K combinator with typeA -> B -> Acombinator(name: str, type: Application) -> Combinator: Create a custom combinatorCombinator: Generic combinator with name and Application typename: str: Name/identifier of the combinatortype: Application: Must be an Application type (function type), not a Variable- Raises
ValidationErrorif type is not an Application
Type Schemas:
schema(pattern: BaseType) -> TypeSchema: Helper function to create a TypeSchemaTypeSchema(pattern: BaseType): Create a type schema for pattern matchingpattern: BaseType: The type pattern containing variables to match againstmatch(target: BaseType) -> Optional[TypeMatch]: Try to match a type against the patternget_pattern_variables() -> Set[str]: Get all variable names in the pattern
TypeMatch: Result of a successful pattern matchoriginal_type: BaseType: The type that was successfully matchedbindings: Dict[str, BaseType]: Mapping from variable names to their matched types
Graph Module (implica.graph)
Elements:
node(type: BaseType, properties: dict[str, Any] = None) -> Node: Create a node from any type (Variable or Application)properties: Optional dict of metadata to attach to the node
edge(combinator: Combinator, properties: dict[str, Any] = None) -> Edge: Create an edge from a combinator (automatically infers source/destination nodes)properties: Optional dict of metadata to attach to the edge- Raises
ValueErrorif combinator doesn't have an Application type
Node: Graph node representing a typetype: BaseType: The type this node represents (can be Variable or Application)uid: str: Unique identifierproperties: dict[str, Any]: Metadata dictionary (empty by default)
Edge: Graph edge representing a transformation via a combinatorsrc_node: Node: Source node (must match combinator's input type)dst_node: Node: Destination node (must match combinator's output type)combinator: Combinator: The combinator (must have Application type)properties: dict[str, Any]: Metadata dictionary (empty by default)- Raises
ValueErrorif:- Combinator type is not an Application
- Input/output types don't match source/destination nodes
Graph:
Graph(): Create a new empty graphgraph.connect() -> Connection: Create a transactional connectiongraph.validate() -> bool: Validate graph consistencygraph.has_node(uid: str) -> bool: Check if node existsgraph.get_node(uid: str) -> Node: Get node by UIDgraph.get_node_by_type(type: BaseType) -> Optional[Node]: Get node by typegraph.get_outgoing_edges(uid: str) -> list[Edge]: Get outgoing edgesgraph.get_incoming_edges(uid: str) -> list[Edge]: Get incoming edgesgraph.nodes() -> Iterator[Node]: Iterate over nodesgraph.edges() -> Iterator[Edge]: Iterate over edgesgraph.node_count() -> int: Get number of nodesgraph.edge_count() -> int: Get number of edges
Schema-Based Query Methods:
graph.get_nodes_matching_schema(schema: TypeSchema) -> list[tuple[Node, TypeMatch]]: Get all nodes matching a schema with their variable bindingsgraph.filter_nodes_by_schema(schema: TypeSchema, properties: dict[str, Any] = None) -> list[Node]: Filter nodes by schema and optionally by propertiesgraph.get_edges_matching_schema(src_schema: TypeSchema = None, dst_schema: TypeSchema = None, combinator_schema: TypeSchema = None) -> list[tuple[Edge, dict[str, TypeMatch]]]: Get edges matching schemas with bindingsgraph.filter_edges_by_schema(src_schema: TypeSchema = None, dst_schema: TypeSchema = None, combinator_schema: TypeSchema = None, properties: dict[str, Any] = None) -> list[Edge]: Filter edges by schemas and propertiesgraph.count_nodes_matching_schema(schema: TypeSchema, properties: dict[str, Any] = None) -> int: Count nodes matching schemagraph.has_node_matching_schema(schema: TypeSchema, properties: dict[str, Any] = None) -> bool: Check if any node matches schemagraph.count_edges_matching_schema(src_schema: TypeSchema = None, dst_schema: TypeSchema = None, combinator_schema: TypeSchema = None, properties: dict[str, Any] = None) -> int: Count edges matching schemasgraph.has_edge_matching_schema(src_schema: TypeSchema = None, dst_schema: TypeSchema = None, combinator_schema: TypeSchema = None, properties: dict[str, Any] = None) -> bool: Check if any edge matches schemas
Connection:
Connection: Transactional graph modification contextconn.add_node(node: Node = None, *, type: BaseType = None, properties: dict[str, Any] = None) -> Connection: Queue node addition- Pass either
node(pre-created) ortypeand optionalproperties(creates node)
- Pass either
conn.add_edge(combinator: Combinator, properties: dict[str, Any] = None) -> Connection: Queue edge addition- Creates edge from combinator with optional properties
conn.remove_node(uid: str) -> Connection: Queue node removalconn.remove_edge(uid: str) -> Connection: Queue edge removalconn.add_many_nodes(nodes: list[Node] = None, *, types: list[BaseType] = None, properties: dict[str, Any] | list[dict[str, Any]] = None) -> Connection: Queue multiple node additions- Pass either
nodes(pre-created) ortypeswith optionalproperties propertiescan be single dict (applied to all) or list of dicts (one per node)- If list, length must match
typeslength
- Pass either
conn.add_many_edges(combinators: list[Combinator], properties: dict[str, Any] | list[dict[str, Any]] = None) -> Connection: Queue multiple edge additionspropertiescan be single dict (applied to all) or list of dicts (one per edge)- If list, length must match
combinatorslength
conn.try_add_node(node: Node = None, *, type: BaseType = None, properties: dict[str, Any] = None) -> Connection: Queue node addition (no error if exists)conn.try_add_edge(combinator: Combinator, properties: dict[str, Any] = None) -> Connection: Queue edge addition (no error if exists)conn.try_add_many_nodes(nodes: list[Node] = None, *, types: list[BaseType] = None, properties: dict[str, Any] | list[dict[str, Any]] = None) -> Connection: Queue multiple node additions (no error if any exist)conn.try_add_many_edges(combinators: list[Combinator], properties: dict[str, Any] | list[dict[str, Any]] = None) -> Connection: Queue multiple edge additions (no error if any exist)conn.remove_many_nodes(node_uids: list[str]) -> Connection: Queue multiple node removalsconn.remove_many_edges(edge_uids: list[str]) -> Connection: Queue multiple edge removalsconn.try_remove_node(node_uid: str) -> Connection: Queue node removal (no error if not exists)conn.try_remove_edge(edge_uid: str) -> Connection: Queue edge removal (no error if not exists)conn.commit(): Apply all queued operationsconn.rollback(): Discard all queued operations
Schema-Based Mutation Methods:
conn.add_nodes_for_schema(schema: TypeSchema, types: list[BaseType], properties: dict[str, Any] = None) -> Connection: Add only types matching schemaconn.try_add_nodes_for_schema(schema: TypeSchema, types: list[BaseType], properties: dict[str, Any] = None) -> Connection: Add matching types (no error if exist)conn.remove_nodes_matching_schema(schema: TypeSchema, properties: dict[str, Any] = None) -> Connection: Remove all nodes matching schemaconn.try_remove_nodes_matching_schema(schema: TypeSchema, properties: dict[str, Any] = None) -> Connection: Remove matching nodes (no error if not exist)conn.remove_edges_matching_schema(src_schema: TypeSchema = None, dst_schema: TypeSchema = None, combinator_schema: TypeSchema = None, properties: dict[str, Any] = None) -> Connection: Remove edges matching schemasconn.try_remove_edges_matching_schema(src_schema: TypeSchema = None, dst_schema: TypeSchema = None, combinator_schema: TypeSchema = None, properties: dict[str, Any] = None) -> Connection: Remove matching edges (no error if not exist)conn.update_nodes_matching_schema(schema: TypeSchema, new_properties: dict[str, Any], filter_properties: dict[str, Any] = None) -> Connection: Update properties of nodes matching schemaconn.try_update_nodes_matching_schema(schema: TypeSchema, new_properties: dict[str, Any], filter_properties: dict[str, Any] = None) -> Connection: Update matching nodes (no error if not exist)conn.update_edges_matching_schema(new_properties: dict[str, Any], src_schema: TypeSchema = None, dst_schema: TypeSchema = None, combinator_schema: TypeSchema = None, filter_properties: dict[str, Any] = None) -> Connection: Update properties of edges matching schemasconn.try_update_edges_matching_schema(new_properties: dict[str, Any], src_schema: TypeSchema = None, dst_schema: TypeSchema = None, combinator_schema: TypeSchema = None, filter_properties: dict[str, Any] = None) -> Connection: Update matching edges (no error if not exist)
Mutations Module (implica.mutations)
Mutation: Abstract base class for all mutationsAddNode(node): Add a single nodeRemoveNode(node_uid): Remove a single nodeAddEdge(edge): Add a single edgeRemoveEdge(edge_uid): Remove a single edgeAddManyNodes(nodes): Add multiple nodes atomicallyAddManyEdges(edges): Add multiple edges atomicallyTryAddNode(node): Add a node or do nothing if it existsTryAddEdge(edge): Add an edge or do nothing if it existsRemoveManyNodes(node_uids): Remove multiple nodes atomicallyRemoveManyEdges(edge_uids): Remove multiple edges atomicallyTryRemoveNode(node_uid): Remove a node or do nothing if it doesn't existTryRemoveEdge(edge_uid): Remove an edge or do nothing if it doesn't exist
Utilities Module (implica.utils)
Parsing Functions:
tokenize(type_str: str) -> list[str]: Tokenize a type string into tokens- Returns a list of tokens: variable names,
'(',')', and'->' - Raises
ValueErrorif invalid characters are found
- Returns a list of tokens: variable names,
parse_type(tokens: list[str]) -> tuple[BaseType, list[str]]: Parse tokens into a type- Uses recursive descent parsing with right-associativity for arrows
- Returns the parsed type and any remaining tokens
- Raises
ValueErrorif tokens cannot be parsed
parse_primary(tokens: list[str]) -> tuple[BaseType, list[str]]: Parse a primary type- Handles variables and parenthesized types
- Returns the parsed type and remaining tokens
- Raises
ValueErrorif tokens cannot be parsed
Note: These functions are primarily used internally by type_from_string(), but are exposed for advanced use cases where you need direct control over the parsing process.
Example:
from implica.utils import tokenize, parse_type
# Tokenize a type string
tokens = tokenize("(A -> B) -> C")
print(tokens) # Output: ['(', 'A', '->', 'B', ')', '->', 'C']
# Parse the tokens
type_result, remaining = parse_type(tokens)
print(type_result) # Output: (A -> B) -> C
print(remaining) # Output: []
# Custom parsing workflow
def parse_and_validate(type_str: str) -> bool:
"""Check if a string is a valid type expression."""
try:
tokens = tokenize(type_str)
result, remaining = parse_type(tokens)
return len(remaining) == 0 # Valid if no tokens remain
except ValueError:
return False
print(parse_and_validate("A -> B")) # Output: True
print(parse_and_validate("A ->")) # Output: False
print(parse_and_validate("(A -> B) -> C")) # Output: True
Development
Setup
# Clone the repository
git clone https://github.com/carlosFerLo/implicational-logic-graph.git
cd implicational-logic-graph
# Install dependencies
poetry install
# Run tests
poetry run pytest
# Run tests with coverage
poetry run pytest --cov=src/implicational_logic_graph
Running Tests
# Run all tests
poetry run pytest
# Run specific test file
poetry run pytest tests/test_graph.py
# Run with verbose output
poetry run pytest -v
# Run with coverage report
poetry run pytest --cov=src/implica --cov-report=html
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add some amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
License
This project is licensed under the MIT License - see the LICENSE file for details.
Acknowledgments
- Based on concepts from combinatory logic and type theory
- Inspired by minimal implicational logic models
- Built with Pydantic for data validation
Citation
If you use this library in your research, please cite:
@software{implica,
author = {Carlos Fernandez},
title = {Implica: Implicational Logic Graph Library},
year = {2025},
url = {https://github.com/CarlosFerLo/implicational-logic-graph}
}
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file implica-0.4.2.tar.gz.
File metadata
- Download URL: implica-0.4.2.tar.gz
- Upload date:
- Size: 58.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.3 CPython/3.10.14 Darwin/24.3.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5a6eee330f052c4a3549c79211d32f1122461ee892a041fb9602087b512b31f3
|
|
| MD5 |
582c3683c1129692dc8becaeee62e62c
|
|
| BLAKE2b-256 |
7a161719626024d5afc37dda535018ff55de110b07e1362cf969c4f7e95984c0
|
File details
Details for the file implica-0.4.2-py3-none-any.whl.
File metadata
- Download URL: implica-0.4.2-py3-none-any.whl
- Upload date:
- Size: 55.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.3 CPython/3.10.14 Darwin/24.3.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
db9125edda57af4b0d801fde39ae7899877313a4088f1e7af7ab1ebe7a32aa68
|
|
| MD5 |
9061700f87685a91c4b09653ece6c7e7
|
|
| BLAKE2b-256 |
ccbf26fb030895bfa3cc36f8b1bbbcb9d7f0c7498829b7b0fe0bb6a1f22e7124
|