A minimalist, type-safe actor library for Python 3.12+ with distributed cluster support
Project description
Casty
Casty is a minimalist, type-safe actor library for Python 3.12+ with distributed cluster support. It implements the actor model elegantly, combining ease of use with type safety through Python's modern advanced type system.
What is the Actor Model?
The actor model is a concurrent programming paradigm where the fundamental unit of computation is the actor. Each actor is an isolated entity that has its own internal state, processes messages sequentially through a mailbox, and can create other actors or send messages to known actors. There is no shared memory between actors — all communication happens exclusively through asynchronous message passing.
This model drastically simplifies concurrent programming by eliminating classic problems like race conditions and deadlocks that arise with locks and shared memory. Casty brings this paradigm to the Python ecosystem in an idiomatic way, leveraging asyncio and the modern type system.
Installation
Using uv:
uv add git+https://github.com/gabfssilva/casty.git
Or using pip:
pip install git+https://github.com/gabfssilva/casty.git
Casty requires Python 3.12 or higher due to its use of PEP 695 generics syntax.
Core Concepts
Actors and Messages
In Casty, you define an actor by creating a class that inherits from Actor[M], where M is the type of message the actor accepts. Messages are defined as dataclasses, ensuring they are immutable data structures that are easily serializable.
from dataclasses import dataclass
from casty import Actor, Context
@dataclass
class Greet:
name: str
class Greeter(Actor[Greet]):
async def receive(self, msg: Greet, ctx: Context[Greet]) -> None:
print(f"Hello, {msg.name}!")
The generic type Actor[Greet] is not just documentation — it guarantees at development time that only messages of type Greet can be sent to this actor. If you try to send a different type, your editor and type checker will alert you.
The Actor System
The ActorSystem is the runtime that manages the lifecycle of actors. It is responsible for creating actors, delivering messages to their mailboxes, and coordinating graceful shutdown. Every Casty program starts by creating an actor system.
Both ActorSystem and DistributedActorSystem implement the async context manager protocol, which is the recommended way to use them. This ensures that all actors are properly shut down and resources are released when the context exits, even if an exception occurs.
import asyncio
from casty import ActorSystem
async def main():
async with ActorSystem() as system:
# Create an actor and get a reference to it
greeter = await system.spawn(Greeter, name="greeter")
# Send a message
await greeter.send(Greet("World"))
# Wait a bit for the message to be processed
await asyncio.sleep(0.1)
# shutdown() is called automatically when exiting the context
asyncio.run(main())
The spawn method creates a new instance of the actor and returns an ActorRef[M] — an opaque reference that allows sending messages to the actor. You never interact directly with the actor instance; all communication goes through the reference.
Actor References
The ActorRef is the safe way to communicate with an actor. It offers two communication patterns:
The fire-and-forget pattern with send() sends a message and returns immediately, without waiting for a response. It's useful for notifications and commands where you don't need confirmation.
await greeter.send(Greet("Alice"))
The request-response pattern with ask() sends a message and waits for a response from the actor. This is implemented internally using asyncio futures, and you can specify a timeout.
from dataclasses import dataclass
from casty import Actor, Context
@dataclass
class Increment:
amount: int = 1
@dataclass
class GetCount:
pass
class Counter(Actor[Increment | GetCount]):
def __init__(self):
self.count = 0
async def receive(self, msg: Increment | GetCount, ctx: Context) -> None:
match msg:
case Increment(amount):
self.count += amount
case GetCount():
ctx.reply(self.count)
# Usage
counter = await system.spawn(Counter, name="counter")
await counter.send(Increment(5))
await counter.send(Increment(3))
value = await counter.ask(GetCount(), timeout=5.0) # Returns 8
Note the use of union types (Increment | GetCount) for actors that accept multiple message types. Python 3.10+'s pattern matching combines perfectly with this pattern.
Context and Actor Hierarchy
The Context is passed to the receive method and provides important functionality during message processing. Through it, you can respond to ask requests and create child actors.
@dataclass
class CreateWorker:
id: str
class Supervisor(Actor[CreateWorker]):
async def receive(self, msg: CreateWorker, ctx: Context) -> None:
# Create a child actor
worker = await ctx.spawn(Worker, name=f"worker-{msg.id}")
ctx.reply(worker) # Return the reference to whoever asked
Actors created via ctx.spawn() are children of the current actor. This establishes a hierarchy that can be used for supervision and failure management.
Actor Lifecycle
Each actor has lifecycle hooks that you can override to execute code at specific moments:
class DatabaseActor(Actor[Query]):
async def on_start(self) -> None:
"""Called once when the actor starts."""
self.connection = await create_db_connection()
print("Connection established")
async def on_stop(self) -> None:
"""Called when the actor is gracefully shut down."""
await self.connection.close()
print("Connection closed")
def on_error(self, exc: Exception) -> bool:
"""
Called when an exception occurs during receive().
Return True to continue processing messages.
Return False to terminate the actor.
"""
if isinstance(exc, TemporaryError):
print(f"Temporary error, continuing: {exc}")
return True
print(f"Fatal error, shutting down: {exc}")
return False
async def receive(self, msg: Query, ctx: Context) -> None:
result = await self.connection.execute(msg.sql)
ctx.reply(result)
The on_error method is particularly important for building resilient systems. It allows you to decide, for each type of exception, whether the actor should continue operating or be terminated.
Distributed Clusters
One of Casty's most powerful features is native support for distributed clusters. You can create systems where actors on different machines communicate transparently.
Creating a Cluster
To create a distributed system, use the distributed() factory method or instantiate DistributedActorSystem directly:
from casty import ActorSystem, DistributedActorSystem
# Using the factory method
system = ActorSystem.distributed(
host="192.168.1.10",
port=8001,
seeds=["192.168.1.11:8001", "192.168.1.12:8001"],
expected_cluster_size=3
)
# Or directly
system = DistributedActorSystem(
host="192.168.1.10",
port=8001,
seeds=["192.168.1.11:8001", "192.168.1.12:8001"],
expected_cluster_size=3
)
The seeds parameter specifies the addresses of other cluster nodes for initial connection. The expected_cluster_size tells how many nodes the cluster should have before starting leader election.
Starting the Server
The easiest way to run a distributed system is using the async context manager. When you enter the context, the TCP server starts automatically in the background, connects to seed nodes, and begins the leader election process.
async def main():
async with DistributedActorSystem("0.0.0.0", 8001, expected_cluster_size=3) as system:
# The server is already running in the background
await system.spawn(MyService, name="my-service")
# Do your work here...
await asyncio.sleep(60)
# shutdown() and cleanup are called automatically
asyncio.run(main())
Alternatively, if you need more control over the server lifecycle, you can call serve() directly. This method starts the TCP server, connects to seeds, participates in the peer discovery protocol, and starts the leader election loop. It blocks until shutdown() is called.
async def main():
system = DistributedActorSystem("0.0.0.0", 8001, expected_cluster_size=3)
await system.spawn(MyService, name="my-service")
# Start the server (blocks until shutdown)
await system.serve()
Remote Actor Discovery
To find an actor on any node in the cluster, use the lookup() method:
# This code can run on any node in the cluster
service = await system.lookup("my-service")
# From here, you can use service normally
# Casty handles serialization and network communication
await service.send(ProcessRequest(data="hello"))
response = await service.ask(GetStatus())
The lookup() first checks the local registry. If the actor is not found locally, it sends a lookup request to all connected peers. When found, it returns a RemoteRef that behaves exactly like a local ActorRef.
Peer Discovery Protocol
Casty implements a gossip protocol for peer discovery. When a new node connects to a seed:
- The new node announces its presence to the seed
- The seed responds with the list of all known peers
- The new node connects to all peers in the list
- Each existing peer also adds the new node to its list
This creates a full mesh topology where all nodes are connected to all others, ensuring that messages can be routed directly between any two nodes.
Raft Consensus Protocol
Casty implements the complete Raft consensus algorithm, providing strong consistency guarantees for distributed state. The implementation includes leader election, log replication, and state persistence.
Leader Election
In a cluster, only one node is the leader at any time, and the others are followers. Election happens automatically when the cluster is formed or when the current leader fails (detected by the absence of heartbeats).
# Check if this node is the leader
if system.is_leader:
print(f"This node is the leader")
else:
print(f"The current leader is {system.leader_id}")
The election algorithm includes the election restriction: a candidate can only win an election if its log is at least as up-to-date as the majority of the cluster. This ensures that the elected leader always has all committed entries.
Log Replication
The leader maintains a replicated log of commands. When you want to execute a command with strong consistency, use the append_command() method:
# Only the leader can append commands
if system.is_leader:
success = await system.append_command({"action": "set", "key": "x", "value": 42})
if success:
print("Command replicated to majority and committed")
The leader:
- Appends the command to its local log
- Sends
AppendEntriesRPCs to all followers in parallel - Waits for a majority of nodes to acknowledge replication
- Commits the entry once a majority has replicated it
- Applies the command to the state machine
Followers perform a consistency check on each AppendEntries request, ensuring their log matches the leader's before appending new entries. If there's a mismatch, the leader decrements next_index for that follower and retries until logs converge.
State Machine Application
To integrate with your application's state machine, set an apply callback:
async def apply_command(command: dict) -> None:
"""Called for each committed command in order."""
match command:
case {"action": "set", "key": key, "value": value}:
state[key] = value
case {"action": "delete", "key": key}:
state.pop(key, None)
system.set_apply_callback(apply_command)
Commands are applied in log order, and only after they are committed (replicated to a majority). This guarantees that all nodes see the same sequence of state changes.
State Persistence
Casty persists critical Raft state to disk to survive crashes:
system = DistributedActorSystem(
"127.0.0.1",
8001,
expected_cluster_size=3,
persistence_dir="/var/lib/casty/data", # Enable persistence
node_id="node-1", # Use consistent node_id across restarts
)
The following state is persisted with fsync before responding to any RPC:
- current_term: The latest term the server has seen
- voted_for: The candidate that received this node's vote in the current term
- log entries: All log entries with their terms and commands
This ensures that after a crash and restart, the node can rejoin the cluster and continue from where it left off.
Snapshots
For clusters with large logs, Casty supports snapshotting to compact the log:
async def create_snapshot() -> bytes:
"""Serialize current state machine state."""
return json.dumps(state).encode()
async def restore_snapshot(data: bytes) -> None:
"""Restore state machine from snapshot."""
state.clear()
state.update(json.loads(data))
system.set_snapshot_callbacks(create_snapshot, restore_snapshot)
# Later, when log grows large:
if system.is_leader:
await system.create_snapshot()
When a follower is far behind (its next_index points before the leader's first log entry), the leader sends an InstallSnapshot RPC instead of log entries.
Message Serialization
For communication between nodes, messages need to be serialized. Casty uses msgpack and automatically supports dataclasses and Python primitive types (int, float, str, bool, None, bytes, list, dict).
@dataclass
class ComplexMessage:
id: int
name: str
data: dict
tags: list[str]
# Works automatically in remote communication
await remote_actor.send(ComplexMessage(
id=42,
name="test",
data={"key": "value"},
tags=["tag1", "tag2"]
))
For custom types, you can register them with the system for better performance:
system.register_type(ComplexMessage)
This adds the type to the internal registry, avoiding the need for dynamic import during deserialization.
Complete Example: Distributed Chat System
Here's a more complete example that demonstrates several Casty features:
import asyncio
from dataclasses import dataclass
from casty import Actor, Context, DistributedActorSystem
# Message definitions
@dataclass
class Join:
username: str
@dataclass
class Leave:
username: str
@dataclass
class Message:
username: str
content: str
@dataclass
class GetUsers:
pass
# The ChatRoom actor manages a chat room
class ChatRoom(Actor[Join | Leave | Message | GetUsers]):
def __init__(self, room_name: str):
self.room_name = room_name
self.users: set[str] = set()
async def on_start(self) -> None:
print(f"Room '{self.room_name}' started")
async def on_stop(self) -> None:
print(f"Room '{self.room_name}' closed with {len(self.users)} users")
async def receive(self, msg: Join | Leave | Message | GetUsers, ctx: Context) -> None:
match msg:
case Join(username):
self.users.add(username)
print(f"[{self.room_name}] {username} joined the room")
case Leave(username):
self.users.discard(username)
print(f"[{self.room_name}] {username} left the room")
case Message(username, content):
print(f"[{self.room_name}] {username}: {content}")
case GetUsers():
ctx.reply(list(self.users))
async def run_node(host: str, port: int, seeds: list[str], is_main: bool):
"""Run a cluster node."""
async with DistributedActorSystem(host, port, seeds, expected_cluster_size=2) as system:
# The main node creates the chat room
if is_main:
await system.spawn(ChatRoom, name="general", room_name="General")
# Wait for cluster to be ready
await asyncio.sleep(2)
# Any node can interact with the room
room = await system.lookup("general")
node_user = f"user-{port}"
await room.send(Join(node_user))
await room.send(Message(node_user, f"Hello from port {port}!"))
users = await room.ask(GetUsers())
print(f"Users in room: {users}")
# Keep running for a while
await asyncio.sleep(5)
await room.send(Leave(node_user))
# shutdown() and cleanup are called automatically
# Run two nodes
async def main():
await asyncio.gather(
run_node("127.0.0.1", 8001, [], is_main=True),
run_node("127.0.0.1", 8002, ["127.0.0.1:8001"], is_main=False),
)
if __name__ == "__main__":
asyncio.run(main())
Internal Architecture
For those interested in understanding how Casty works internally, here's an overview of the architecture.
Module Structure
The code is organized into well-defined layers. The core layer contains actor.py with the fundamental primitives (Actor, ActorRef, Context, Envelope) and system.py with the local runtime. The cluster layer in cluster/ extends the system with distributed capabilities through specialized mixins.
The mixins follow the single responsibility principle:
PeerMixin: Manages TCP connections between nodes and the gossip protocolMessagingMixin: Implements lookup and remote message sendingElectionMixin: Handles leader election with election restrictionPersistenceMixin: Manages durable state (term, voted_for, log) with fsyncLogReplicationMixin: Implements AppendEntries RPC and commit advancementSnapshotMixin: Handles snapshot creation and InstallSnapshot RPC
The DistributedActorSystem inherits from all of them, composing the functionalities.
The Envelope Pattern
Internally, messages are encapsulated in an Envelope before being placed in the actor's mailbox. The envelope carries the original message and, optionally, a future for the ask pattern. This allows the system to implement request-response without exposing this complexity to the user.
Transport Protocol
Communication between nodes uses TCP with a simple binary protocol. Each message on the network is prefixed with its size (length-prefix framing), allowing the receiver to know exactly how many bytes to read. The format is: message type (1 byte), actor name length (2 bytes), actor name, payload length (4 bytes), and msgpack-serialized payload.
Message types include:
ACTOR_MSG(1): Regular actor messagesLOOKUP_REQ/RES(2-3): Actor discoveryPEER_LIST(4): Gossip protocolASK_REQ/RES(5-6): Request-response patternVOTE_REQUEST/RESPONSE(7-8): Raft electionHEARTBEAT(9): Legacy heartbeat (now uses AppendEntries)APPEND_ENTRIES_REQ/RES(12-13): Log replicationINSTALL_SNAPSHOT_REQ/RES(14-15): Snapshot transfer
Type Safety in Depth
The use of generics in Actor[M], ActorRef[M], and Context[M] creates a chain of types that flows from the moment of actor definition to message sending. When you do await system.spawn(MyActor), the system infers that the return is ActorRef[MyMessage] based on MyActor's definition. This allows type checkers like mypy and pyright to detect errors at development time.
Design Philosophy
Casty was designed with several principles in mind.
Minimalism over features: The library does one thing well — the actor model. It doesn't try to be a complete framework with HTTP, databases, or other integrations. You compose Casty with other libraries as needed.
Type safety without boilerplate: Python 3.12+'s type system allows expressing powerful constraints without complex decorators or verbose configuration. A simple Actor[MyMessage] captures the intent clearly and verifiably.
Location transparency: The code that interacts with a local actor is identical to code that interacts with a remote actor. This allows you to develop and test locally, then distribute without changes to business code.
Failures as first-class citizens: The on_error method and lifecycle hooks acknowledge that failures happen. Instead of hiding this, Casty gives you the tools to handle failures explicitly.
Strong consistency when needed: The full Raft implementation provides linearizable semantics for distributed state, ensuring all nodes see the same sequence of committed operations.
Known Limitations
There is no backpressure in actor mailboxes. If an actor receives messages faster than it can process them, the queue grows indefinitely. For high-load production systems, consider implementing rate limiting at the application level.
Membership changes (adding/removing nodes from a running cluster) are not yet supported. The cluster size must be known at startup.
Contributing
Contributions are welcome! Casty is a young project with plenty of room for improvements. Some areas of interest include Erlang/Elixir-style hierarchical supervision support, metrics and observability, membership changes (joint consensus), and performance optimizations.
License
MIT License - see the LICENSE file for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file casty-0.1.0.tar.gz.
File metadata
- Download URL: casty-0.1.0.tar.gz
- Upload date:
- Size: 58.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.5.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ac0971f06a9224ac8aff08581ecd49748ea20f53164957d3403e72a5022e3b23
|
|
| MD5 |
90cea99c7156187cfc22630415259c43
|
|
| BLAKE2b-256 |
8951b5f720e4120f3f996059c35a90a86b19bdd3feacf2741b22f4abe0f9158e
|
File details
Details for the file casty-0.1.0-py3-none-any.whl.
File metadata
- Download URL: casty-0.1.0-py3-none-any.whl
- Upload date:
- Size: 40.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.5.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a1c4734ea842a35e242ce1d570face270b1e9f4bdd7cdc7b1db09fb81726b062
|
|
| MD5 |
e3345d49d4eb51468c71bffa7488b8b3
|
|
| BLAKE2b-256 |
f45533340a065f7691460d63f715347a92f6cd41f57a68d579bd86fea4838c15
|