Skip to main content

A minimalist, type-safe actor library for Python 3.12+ with distributed cluster support

Project description

Casty

Casty is a minimalist, type-safe actor library for Python 3.12+ with distributed cluster support. It implements the actor model elegantly, combining ease of use with type safety through Python's modern advanced type system.

What is the Actor Model?

The actor model is a concurrent programming paradigm where the fundamental unit of computation is the actor. Each actor is an isolated entity that has its own internal state, processes messages sequentially through a mailbox, and can create other actors or send messages to known actors. There is no shared memory between actors — all communication happens exclusively through asynchronous message passing.

This model drastically simplifies concurrent programming by eliminating classic problems like race conditions and deadlocks that arise with locks and shared memory. Casty brings this paradigm to the Python ecosystem in an idiomatic way, leveraging asyncio and the modern type system.

Installation

Using uv:

uv add git+https://github.com/gabfssilva/casty.git

Or using pip:

pip install git+https://github.com/gabfssilva/casty.git

Casty requires Python 3.12 or higher due to its use of PEP 695 generics syntax.

Core Concepts

Actors and Messages

In Casty, you define an actor by creating a class that inherits from Actor[M], where M is the type of message the actor accepts. Messages are defined as dataclasses, ensuring they are immutable data structures that are easily serializable.

from dataclasses import dataclass
from casty import Actor, Context

@dataclass
class Greet:
    name: str

class Greeter(Actor[Greet]):
    async def receive(self, msg: Greet, ctx: Context[Greet]) -> None:
        print(f"Hello, {msg.name}!")

The generic type Actor[Greet] is not just documentation — it guarantees at development time that only messages of type Greet can be sent to this actor. If you try to send a different type, your editor and type checker will alert you.

The Actor System

The ActorSystem is the runtime that manages the lifecycle of actors. It is responsible for creating actors, delivering messages to their mailboxes, and coordinating graceful shutdown. Every Casty program starts by creating an actor system.

Both ActorSystem and DistributedActorSystem implement the async context manager protocol, which is the recommended way to use them. This ensures that all actors are properly shut down and resources are released when the context exits, even if an exception occurs.

import asyncio
from casty import ActorSystem

async def main():
    async with ActorSystem() as system:
        # Create an actor and get a reference to it
        greeter = await system.spawn(Greeter, name="greeter")

        # Send a message
        await greeter.send(Greet("World"))

        # Wait a bit for the message to be processed
        await asyncio.sleep(0.1)

    # shutdown() is called automatically when exiting the context

asyncio.run(main())

The spawn method creates a new instance of the actor and returns an ActorRef[M] — an opaque reference that allows sending messages to the actor. You never interact directly with the actor instance; all communication goes through the reference.

Actor References

The ActorRef is the safe way to communicate with an actor. It offers two communication patterns:

The fire-and-forget pattern with send() sends a message and returns immediately, without waiting for a response. It's useful for notifications and commands where you don't need confirmation.

await greeter.send(Greet("Alice"))

The request-response pattern with ask() sends a message and waits for a response from the actor. This is implemented internally using asyncio futures, and you can specify a timeout.

from dataclasses import dataclass
from casty import Actor, Context

@dataclass
class Increment:
    amount: int = 1

@dataclass
class GetCount:
    pass

class Counter(Actor[Increment | GetCount]):
    def __init__(self):
        self.count = 0

    async def receive(self, msg: Increment | GetCount, ctx: Context) -> None:
        match msg:
            case Increment(amount):
                self.count += amount
            case GetCount():
                ctx.reply(self.count)

# Usage
counter = await system.spawn(Counter, name="counter")
await counter.send(Increment(5))
await counter.send(Increment(3))
value = await counter.ask(GetCount(), timeout=5.0)  # Returns 8

Note the use of union types (Increment | GetCount) for actors that accept multiple message types. Python 3.10+'s pattern matching combines perfectly with this pattern.

Context and Actor Hierarchy

The Context is passed to the receive method and provides important functionality during message processing. Through it, you can respond to ask requests and create child actors.

@dataclass
class CreateWorker:
    id: str

class Supervisor(Actor[CreateWorker]):
    async def receive(self, msg: CreateWorker, ctx: Context) -> None:
        # Create a child actor
        worker = await ctx.spawn(Worker, name=f"worker-{msg.id}")
        ctx.reply(worker)  # Return the reference to whoever asked

Actors created via ctx.spawn() are children of the current actor. This establishes a hierarchy that can be used for supervision and failure management.

Actor Lifecycle

Each actor has lifecycle hooks that you can override to execute code at specific moments:

class DatabaseActor(Actor[Query]):
    async def on_start(self) -> None:
        """Called once when the actor starts."""
        self.connection = await create_db_connection()
        print("Connection established")

    async def on_stop(self) -> None:
        """Called when the actor is gracefully shut down."""
        await self.connection.close()
        print("Connection closed")

    def on_error(self, exc: Exception) -> bool:
        """
        Called when an exception occurs during receive().
        Return True to continue processing messages.
        Return False to terminate the actor.
        """
        if isinstance(exc, TemporaryError):
            print(f"Temporary error, continuing: {exc}")
            return True
        print(f"Fatal error, shutting down: {exc}")
        return False

    async def receive(self, msg: Query, ctx: Context) -> None:
        result = await self.connection.execute(msg.sql)
        ctx.reply(result)

The on_error method is particularly important for building resilient systems. It allows you to decide, for each type of exception, whether the actor should continue operating or be terminated.

Distributed Clusters

One of Casty's most powerful features is native support for distributed clusters. You can create systems where actors on different machines communicate transparently.

Creating a Cluster

To create a distributed system, use the distributed() factory method or instantiate DistributedActorSystem directly:

from casty import ActorSystem, DistributedActorSystem

# Using the factory method
system = ActorSystem.distributed(
    host="192.168.1.10",
    port=8001,
    seeds=["192.168.1.11:8001", "192.168.1.12:8001"],
    expected_cluster_size=3
)

# Or directly
system = DistributedActorSystem(
    host="192.168.1.10",
    port=8001,
    seeds=["192.168.1.11:8001", "192.168.1.12:8001"],
    expected_cluster_size=3
)

The seeds parameter specifies the addresses of other cluster nodes for initial connection. The expected_cluster_size tells how many nodes the cluster should have before starting leader election.

Starting the Server

The easiest way to run a distributed system is using the async context manager. When you enter the context, the TCP server starts automatically in the background, connects to seed nodes, and begins the leader election process.

async def main():
    async with DistributedActorSystem("0.0.0.0", 8001, expected_cluster_size=3) as system:
        # The server is already running in the background
        await system.spawn(MyService, name="my-service")

        # Do your work here...
        await asyncio.sleep(60)

    # shutdown() and cleanup are called automatically

asyncio.run(main())

Alternatively, if you need more control over the server lifecycle, you can call serve() directly. This method starts the TCP server, connects to seeds, participates in the peer discovery protocol, and starts the leader election loop. It blocks until shutdown() is called.

async def main():
    system = DistributedActorSystem("0.0.0.0", 8001, expected_cluster_size=3)
    await system.spawn(MyService, name="my-service")

    # Start the server (blocks until shutdown)
    await system.serve()

Remote Actor Discovery

To find an actor on any node in the cluster, use the lookup() method:

# This code can run on any node in the cluster
service = await system.lookup("my-service")

# From here, you can use service normally
# Casty handles serialization and network communication
await service.send(ProcessRequest(data="hello"))
response = await service.ask(GetStatus())

The lookup() first checks the local registry. If the actor is not found locally, it sends a lookup request to all connected peers. When found, it returns a RemoteRef that behaves exactly like a local ActorRef.

Peer Discovery Protocol

Casty implements a gossip protocol for peer discovery. When a new node connects to a seed:

  1. The new node announces its presence to the seed
  2. The seed responds with the list of all known peers
  3. The new node connects to all peers in the list
  4. Each existing peer also adds the new node to its list

This creates a full mesh topology where all nodes are connected to all others, ensuring that messages can be routed directly between any two nodes.

Raft Consensus Protocol

Casty implements the complete Raft consensus algorithm, providing strong consistency guarantees for distributed state. The implementation includes leader election, log replication, and state persistence.

Leader Election

In a cluster, only one node is the leader at any time, and the others are followers. Election happens automatically when the cluster is formed or when the current leader fails (detected by the absence of heartbeats).

# Check if this node is the leader
if system.is_leader:
    print(f"This node is the leader")
else:
    print(f"The current leader is {system.leader_id}")

The election algorithm includes the election restriction: a candidate can only win an election if its log is at least as up-to-date as the majority of the cluster. This ensures that the elected leader always has all committed entries.

Log Replication

The leader maintains a replicated log of commands. When you want to execute a command with strong consistency, use the append_command() method:

# Only the leader can append commands
if system.is_leader:
    success = await system.append_command({"action": "set", "key": "x", "value": 42})
    if success:
        print("Command replicated to majority and committed")

The leader:

  1. Appends the command to its local log
  2. Sends AppendEntries RPCs to all followers in parallel
  3. Waits for a majority of nodes to acknowledge replication
  4. Commits the entry once a majority has replicated it
  5. Applies the command to the state machine

Followers perform a consistency check on each AppendEntries request, ensuring their log matches the leader's before appending new entries. If there's a mismatch, the leader decrements next_index for that follower and retries until logs converge.

State Machine Application

To integrate with your application's state machine, set an apply callback:

async def apply_command(command: dict) -> None:
    """Called for each committed command in order."""
    match command:
        case {"action": "set", "key": key, "value": value}:
            state[key] = value
        case {"action": "delete", "key": key}:
            state.pop(key, None)

system.set_apply_callback(apply_command)

Commands are applied in log order, and only after they are committed (replicated to a majority). This guarantees that all nodes see the same sequence of state changes.

State Persistence

Casty persists critical Raft state to disk to survive crashes:

system = DistributedActorSystem(
    "127.0.0.1",
    8001,
    expected_cluster_size=3,
    persistence_dir="/var/lib/casty/data",  # Enable persistence
    node_id="node-1",  # Use consistent node_id across restarts
)

The following state is persisted with fsync before responding to any RPC:

  • current_term: The latest term the server has seen
  • voted_for: The candidate that received this node's vote in the current term
  • log entries: All log entries with their terms and commands

This ensures that after a crash and restart, the node can rejoin the cluster and continue from where it left off.

Snapshots

For clusters with large logs, Casty supports snapshotting to compact the log:

async def create_snapshot() -> bytes:
    """Serialize current state machine state."""
    return json.dumps(state).encode()

async def restore_snapshot(data: bytes) -> None:
    """Restore state machine from snapshot."""
    state.clear()
    state.update(json.loads(data))

system.set_snapshot_callbacks(create_snapshot, restore_snapshot)

# Later, when log grows large:
if system.is_leader:
    await system.create_snapshot()

When a follower is far behind (its next_index points before the leader's first log entry), the leader sends an InstallSnapshot RPC instead of log entries.

Message Serialization

For communication between nodes, messages need to be serialized. Casty uses msgpack and automatically supports dataclasses and Python primitive types (int, float, str, bool, None, bytes, list, dict).

@dataclass
class ComplexMessage:
    id: int
    name: str
    data: dict
    tags: list[str]

# Works automatically in remote communication
await remote_actor.send(ComplexMessage(
    id=42,
    name="test",
    data={"key": "value"},
    tags=["tag1", "tag2"]
))

For custom types, you can register them with the system for better performance:

system.register_type(ComplexMessage)

This adds the type to the internal registry, avoiding the need for dynamic import during deserialization.

Complete Example: Distributed Chat System

Here's a more complete example that demonstrates several Casty features:

import asyncio
from dataclasses import dataclass
from casty import Actor, Context, DistributedActorSystem

# Message definitions
@dataclass
class Join:
    username: str

@dataclass
class Leave:
    username: str

@dataclass
class Message:
    username: str
    content: str

@dataclass
class GetUsers:
    pass

# The ChatRoom actor manages a chat room
class ChatRoom(Actor[Join | Leave | Message | GetUsers]):
    def __init__(self, room_name: str):
        self.room_name = room_name
        self.users: set[str] = set()

    async def on_start(self) -> None:
        print(f"Room '{self.room_name}' started")

    async def on_stop(self) -> None:
        print(f"Room '{self.room_name}' closed with {len(self.users)} users")

    async def receive(self, msg: Join | Leave | Message | GetUsers, ctx: Context) -> None:
        match msg:
            case Join(username):
                self.users.add(username)
                print(f"[{self.room_name}] {username} joined the room")

            case Leave(username):
                self.users.discard(username)
                print(f"[{self.room_name}] {username} left the room")

            case Message(username, content):
                print(f"[{self.room_name}] {username}: {content}")

            case GetUsers():
                ctx.reply(list(self.users))

async def run_node(host: str, port: int, seeds: list[str], is_main: bool):
    """Run a cluster node."""
    async with DistributedActorSystem(host, port, seeds, expected_cluster_size=2) as system:
        # The main node creates the chat room
        if is_main:
            await system.spawn(ChatRoom, name="general", room_name="General")

        # Wait for cluster to be ready
        await asyncio.sleep(2)

        # Any node can interact with the room
        room = await system.lookup("general")

        node_user = f"user-{port}"
        await room.send(Join(node_user))
        await room.send(Message(node_user, f"Hello from port {port}!"))

        users = await room.ask(GetUsers())
        print(f"Users in room: {users}")

        # Keep running for a while
        await asyncio.sleep(5)

        await room.send(Leave(node_user))

    # shutdown() and cleanup are called automatically

# Run two nodes
async def main():
    await asyncio.gather(
        run_node("127.0.0.1", 8001, [], is_main=True),
        run_node("127.0.0.1", 8002, ["127.0.0.1:8001"], is_main=False),
    )

if __name__ == "__main__":
    asyncio.run(main())

Internal Architecture

For those interested in understanding how Casty works internally, here's an overview of the architecture.

Module Structure

The code is organized into well-defined layers. The core layer contains actor.py with the fundamental primitives (Actor, ActorRef, Context, Envelope) and system.py with the local runtime. The cluster layer in cluster/ extends the system with distributed capabilities through specialized mixins.

The mixins follow the single responsibility principle:

  • PeerMixin: Manages TCP connections between nodes and the gossip protocol
  • MessagingMixin: Implements lookup and remote message sending
  • ElectionMixin: Handles leader election with election restriction
  • PersistenceMixin: Manages durable state (term, voted_for, log) with fsync
  • LogReplicationMixin: Implements AppendEntries RPC and commit advancement
  • SnapshotMixin: Handles snapshot creation and InstallSnapshot RPC

The DistributedActorSystem inherits from all of them, composing the functionalities.

The Envelope Pattern

Internally, messages are encapsulated in an Envelope before being placed in the actor's mailbox. The envelope carries the original message and, optionally, a future for the ask pattern. This allows the system to implement request-response without exposing this complexity to the user.

Transport Protocol

Communication between nodes uses TCP with a simple binary protocol. Each message on the network is prefixed with its size (length-prefix framing), allowing the receiver to know exactly how many bytes to read. The format is: message type (1 byte), actor name length (2 bytes), actor name, payload length (4 bytes), and msgpack-serialized payload.

Message types include:

  • ACTOR_MSG (1): Regular actor messages
  • LOOKUP_REQ/RES (2-3): Actor discovery
  • PEER_LIST (4): Gossip protocol
  • ASK_REQ/RES (5-6): Request-response pattern
  • VOTE_REQUEST/RESPONSE (7-8): Raft election
  • HEARTBEAT (9): Legacy heartbeat (now uses AppendEntries)
  • APPEND_ENTRIES_REQ/RES (12-13): Log replication
  • INSTALL_SNAPSHOT_REQ/RES (14-15): Snapshot transfer

Type Safety in Depth

The use of generics in Actor[M], ActorRef[M], and Context[M] creates a chain of types that flows from the moment of actor definition to message sending. When you do await system.spawn(MyActor), the system infers that the return is ActorRef[MyMessage] based on MyActor's definition. This allows type checkers like mypy and pyright to detect errors at development time.

Design Philosophy

Casty was designed with several principles in mind.

Minimalism over features: The library does one thing well — the actor model. It doesn't try to be a complete framework with HTTP, databases, or other integrations. You compose Casty with other libraries as needed.

Type safety without boilerplate: Python 3.12+'s type system allows expressing powerful constraints without complex decorators or verbose configuration. A simple Actor[MyMessage] captures the intent clearly and verifiably.

Location transparency: The code that interacts with a local actor is identical to code that interacts with a remote actor. This allows you to develop and test locally, then distribute without changes to business code.

Failures as first-class citizens: The on_error method and lifecycle hooks acknowledge that failures happen. Instead of hiding this, Casty gives you the tools to handle failures explicitly.

Strong consistency when needed: The full Raft implementation provides linearizable semantics for distributed state, ensuring all nodes see the same sequence of committed operations.

Known Limitations

There is no backpressure in actor mailboxes. If an actor receives messages faster than it can process them, the queue grows indefinitely. For high-load production systems, consider implementing rate limiting at the application level.

Membership changes (adding/removing nodes from a running cluster) are not yet supported. The cluster size must be known at startup.

Contributing

Contributions are welcome! Casty is a young project with plenty of room for improvements. Some areas of interest include Erlang/Elixir-style hierarchical supervision support, metrics and observability, membership changes (joint consensus), and performance optimizations.

License

MIT License - see the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

casty-0.1.0.tar.gz (58.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

casty-0.1.0-py3-none-any.whl (40.5 kB view details)

Uploaded Python 3

File details

Details for the file casty-0.1.0.tar.gz.

File metadata

  • Download URL: casty-0.1.0.tar.gz
  • Upload date:
  • Size: 58.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.5.9

File hashes

Hashes for casty-0.1.0.tar.gz
Algorithm Hash digest
SHA256 ac0971f06a9224ac8aff08581ecd49748ea20f53164957d3403e72a5022e3b23
MD5 90cea99c7156187cfc22630415259c43
BLAKE2b-256 8951b5f720e4120f3f996059c35a90a86b19bdd3feacf2741b22f4abe0f9158e

See more details on using hashes here.

File details

Details for the file casty-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: casty-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 40.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.5.9

File hashes

Hashes for casty-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 a1c4734ea842a35e242ce1d570face270b1e9f4bdd7cdc7b1db09fb81726b062
MD5 e3345d49d4eb51468c71bffa7488b8b3
BLAKE2b-256 f45533340a065f7691460d63f715347a92f6cd41f57a68d579bd86fea4838c15

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page