> ## Documentation Index
> Fetch the complete documentation index at: https://autonomy.computer/docs/llms.txt
> Use this file to discover all available pages before exploring further.

# Actors

> Understand the actor model and message-based communication in Autonomy.

Autonomy is designed on principles from the **actor model**, a powerful
pattern for building concurrent and distributed systems. Before we dive into
agents, it's helpful to understand this foundation of actors.

Think of actors as independent members of a team. Each has their own
workspace (state), and they communicate only by passing notes (messages) to each
other. No one shares their workspace, this prevents contention and makes
applications naturally parallelizable and scalable.

***

**Actors** are lightweight, stateful objects that communicate using messages.

Each actor has:

* **A unique address** that other actors use to send it messages.
* **A mailbox** where incoming messages queue up and wait to be processed.
* **Internal state** that only this actor can access or modify.
* **Behavior**, logic that defines how this actor reacts to each message it receives.

When an actor receives a message, it can:

1. **Send messages** to other actors (asynchronously, without waiting for responses).
2. **Create new actors** and delegate work to them.
3. **Update its internal state** which may affect how future messages are handled.

***

**No shared state**

Each actor's state is completely private. Other actors cannot directly access
or modify it. This eliminates entire classes of concurrency bugs like race conditions
and deadlocks. Actors can only influence each other by sending messages.

**Message Processing**

While many actors run in parallel across the system, each individual actor
processes only one message at a time. Messages wait in the actor's mailbox and
are handled sequentially. This means you never need to worry about two threads
modifying an actor's state simultaneously; it simply can't happen.

**Asynchronous message passing**

When Actor A sends a message to Actor B, it doesn't have to wait for B to
process it or respond. The message is queued in B's mailbox, and A can continue
with its work immediately. This non-blocking communication allows actors to work
concurrently without waiting on each other.

**Location Transparency**

Sending or receiving a message from another actor looks the
same whether that other actor is running on the same machine or across the network.
This makes it natural to distribute work and scale horizontally.

***

**In Autonomy:**

* Simple actors are called workers.
* Agents are intelligent autonomous actors that use large language models.
* Both follow the actor model.

When an actor is idle and there are no messages in its mailbox,
it consumes no CPU. This design makes it easy to run thousands of concurrent
stateful actors that make the most optimal use of available CPU cores.

Agents, for example, spend a majority of their lifespan waiting for calls
to language models or tools to finish. While one agent is waiting, the actor
runtime automatically gives the CPU core to a different actor that has a
new message to process.

This enables highly efficient and horizontally scalable applications.

***

## Workers

Workers are Autonomy's implementation of actors.

Autonomy's actor runtime is implemented in Rust and exposed to Python code
using the `Node` class. All nodes in a zone can create encrypted, mutually authenticated,
secure communication channels with other nodes.

***

Here's a simple worker that echoes messages back:

<CodeGroup>
  ```python images/main/main.py theme={null}
  from autonomy import Node


  class Greeter:
      async def handle_message(self, context, message):
          await context.reply(f"Hello, {message}!")


  async def main(node):
      # Start the worker
      await node.start_worker("greeter", Greeter())
      
      # Send a message and wait for reply
      reply = await node.send_and_receive("greeter", "Alice", timeout=10)
      print(reply)  # "Hello, Alice!"


  Node.start(main)
  ```

  ```python images/main/Dockerfile theme={null}
  FROM ghcr.io/build-trust/autonomy-python
  COPY . .
  ENTRYPOINT ["python", "main.py"]
  ```

  ```yaml autonomy.yaml theme={null}
  name: hello
  pods:
    - name: main-pod
      containers:
        - name: main
          image: main
  ```
</CodeGroup>

**What's happening:**

1. Define a worker class with `handle_message()`.
2. When `Node.start(main)` is called, it turns the `main` function itself into a worker that can send and receive messages like other workers. This is why `main` is able to communicate with `greeter`.
3. Start the `Greeter` worker with a unique name (`greeter`).
4. Send a message from `main` to the `greeter` worker.
5. The `greeter` worker processes the message and replies.
6. The `main` worker receives the reply.

**Worker Lifecycle**

Workers can be started and stopped dynamically:

```python theme={null}
# Start a worker
await node.start_worker("worker_name", WorkerClass())

# List all workers
workers = await node.list_workers()

# Stop a worker
await node.stop_worker("worker_name")
```

### Messages

**Message Types**

Messages must be strings. This is because the messaging layer transmits
messages across the network, and strings are simple, universal, and work everywhere.

```python theme={null}
# Correct - string message
await node.send_message("worker", "hello")

# Wrong - will raise TypeError
await node.send_message("worker", {"data": 123})  # ❌
```

**Structured Data with JSON**

For structured data, use JSON serialization:

```python theme={null}
import json

class Calculator:
    async def handle_message(self, context, message):
        # Parse incoming JSON
        data = json.loads(message)
        result = data["x"] + data["y"]
        
        # Reply with JSON
        await context.reply(json.dumps({"result": result}))


async def main(node):
    await node.start_worker("calc", Calculator())
    
    # Send structured data as JSON
    data = json.dumps({"x": 10, "y": 32})
    reply = await node.send_and_receive("calc", data, timeout=10)
    
    # Parse the reply
    result = json.loads(reply)
    print(f"Result: {result['result']}")  # 42
```

**Message Flow**

Messages flow asynchronously between workers:

1. Sender sends message to worker by name
2. Message is queued if worker is busy
3. Worker processes messages one at a time
4. Worker can optionally reply
5. Sender receives reply (if waiting)

### State

**Stateful Workers**

Workers can maintain state across messages. This is safe because workers process one message at a time:

```python theme={null}
class Counter:
    def __init__(self):
        self.count = 0
    
    async def handle_message(self, context, message):
        if message == "increment":
            self.count += 1
        await context.reply(f"Count: {self.count}")


async def main(node):
    await node.start_worker("counter", Counter())
    
    # Each message sees the updated state
    print(await node.send_and_receive("counter", "increment", timeout=10))  # Count: 1
    print(await node.send_and_receive("counter", "increment", timeout=10))  # Count: 2
    print(await node.send_and_receive("counter", "increment", timeout=10))  # Count: 3
```

**Use cases for stateful workers:**

* Session management
* Connection pooling
* Rate limiting
* Caching
* Accumulating results

**State Isolation**

No shared state between workers - each worker is completely isolated:

```python theme={null}
# Each worker has its own state
await node.start_worker("counter-1", Counter())
await node.start_worker("counter-2", Counter())
# These two counters are completely independent
```

Workers are single-threaded - state is safe within a worker:

```python theme={null}
class Safe:
    def __init__(self):
        self.count = 0  # Safe - only one message at a time
    
    async def handle_message(self, context, message):
        self.count += 1  # No race conditions
```

**Initialization**

Use `__init__()` for one-time setup:

```python theme={null}
class Database:
    def __init__(self):
        self.connection = create_connection()  # Setup once
    
    async def handle_message(self, context, message):
        result = self.connection.query(message)
        await context.reply(result)
```

***

## Communication

### Message Patterns

**Fire and Forget**

Send a message without waiting for a reply:

```python theme={null}
# Just send it and move on
await node.send_message("logger", "Application started")
```

**Request-Reply**

Send a message and wait for a response:

```python theme={null}
# Wait for the worker to respond (timeout in seconds)
reply = await node.send_and_receive("calculator", "2+2", timeout=10)
print(reply)  # "4"
```

**Important:** Always specify a timeout (in seconds) to prevent hanging forever if the worker doesn't respond.

**Timeout Handling**

Handle timeouts gracefully:

```python theme={null}
try:
    reply = await node.send_and_receive("worker", message, timeout=10)
    print(f"Got reply: {reply}")
except RuntimeError as e:
    if "timeout" in str(e).lower():
        print("Worker didn't respond in time")
    else:
        raise
```

### Error Handling

**Timeout Errors**

**Symptoms:** `RuntimeError` with "timeout" in the message

**Possible causes:**

* Worker doing heavy computation (increase timeout)
* Worker crashed (check logs)
* Network issues (for distributed workers)
* Worker stuck on previous message

```python theme={null}
# Use longer timeout for complex operations (timeout in seconds)
try:
    reply = await node.send_and_receive("worker", msg, timeout=30)
except RuntimeError as e:
    if "timeout" in str(e).lower():
        print("Operation timed out after 30 seconds")
    else:
        raise
```

**Recovery Strategies**

Implement fallback strategies for resilience:

```python theme={null}
try:
    reply = await node.send_and_receive("primary", msg, timeout=5)
except RuntimeError as e:
    if "timeout" in str(e).lower():
        # Have a fallback strategy
        reply = await node.send_and_receive("backup", msg, timeout=5)
    else:
        raise
```

***

## Distribution

### Architecture

**Nodes and Pods**

One of the most powerful features: workers can run on different machines and communicate seamlessly.

```
Main Pod (coordinator)
├── Node running main.py
└── Starts workers on remote nodes

Runner Pods (3 clones = 3 machines)
├── Runner 1: Workers processing tasks
├── Runner 2: Workers processing tasks
└── Runner 3: Workers processing tasks
```

**Clones Configuration**

Use `clones` to create multiple machines running the same container:

```yaml autonomy.yaml theme={null}
name: distribute
pods:
  - name: main-pod
    public: true
    containers:
      - name: main
        image: main
  
  - name: runner-pod
    clones: 3  # Creates 3 separate machines
    containers:
      - name: runner
        image: runner
```

### Implementation

**Node Discovery**

Discover and connect to remote nodes:

```python theme={null}
from autonomy import Node, Zone

async def main(node):
    # Discover all runner nodes
    runners = await Zone.nodes(node, filter="runner")
    print(f"Found {len(runners)} runner nodes")
    
    # Get all nodes (no filter)
    all_nodes = await Zone.nodes(node)
    print(f"Total nodes: {len(all_nodes)}")
```

**Remote Worker Management**

Start and manage workers on remote nodes:

```python images/main/main.py theme={null}
from autonomy import Node, Zone


class DataProcessor:
    async def handle_message(self, context, message):
        # Process data here
        result = f"Processed: {message}"
        await context.reply(result)


async def main(node):
    # Discover all runner nodes
    runners = await Zone.nodes(node, filter="runner")
    print(f"Found {len(runners)} runner nodes")
    
    # Start a worker on each runner
    for i, runner in enumerate(runners):
        await runner.start_worker(f"processor-{i}", DataProcessor())
    
    # Send work to each worker
    for i in range(len(runners)):
        result = await node.send_and_receive(f"processor-{i}", f"task-{i}", timeout=10)
        print(result)


Node.start(main)
```

**What's happening:**

1. `Zone.nodes()` discovers all nodes matching a filter
2. Start workers on each remote node
3. Send messages to workers by name—they run on different machines!
4. Messages are automatically routed across the network

***

### Complete Multi-Node Example

Here's a complete working example that demonstrates communication between nodes running on different pods.

**Architecture:**

```
Main Pod (client)
└── Sends messages to runner

Runner Pod (runner)
└── Runs greeter worker that responds to messages
```

**Configuration:**

```yaml autonomy.yaml theme={null}
name: multinode
pods:
  - name: main-pod
    size: small
    public: true
    containers:
      - name: client
        image: client

  - name: runner-pod
    size: small
    containers:
      - name: runner
        image: runner
```

**Runner Node:**

```python images/runner/main.py theme={null}
from autonomy import Node, info

class Greeter:
    async def handle_message(self, context, message):
        info(f"Greeter received: {message}")
        reply = f"Hello! You said: {message}"
        await context.reply(reply)

async def main(node):
    info("Starting runner node...")
    await node.start_worker("greeter", Greeter())
    info("Runner is ready with greeter worker")

Node.start(main)
```

**Client Node:**

```python images/client/main.py theme={null}
from autonomy import Node, Zone, info
import asyncio

async def main(node):
    info("Client node started")

    # Wait for runner to be ready with retry loop
    info("Waiting for runner to be ready...")
    runners = []
    max_retries = 10
    for attempt in range(max_retries):
        runners = await Zone.nodes(node, filter="runner")
        if runners:
            break
        info(f"No runner nodes found yet, retrying... (attempt {attempt + 1}/{max_retries})")
        await asyncio.sleep(2)

    if not runners:
        info("No runner nodes found after waiting!")
        return

    runner = runners[0]
    info(f"Connected to runner: {runner.name}")

    # Send messages to the greeter worker
    messages = [
        "Hello from client",
        "How are you?",
        "This is a test message",
        "Goodbye!"
    ]

    for i, message in enumerate(messages, 1):
        info(f"Sending message {i}: {message}")

        try:
            reply = await runner.send_and_receive("greeter", message, timeout=30)
            info(f"Received reply {i}: {reply}")
        except Exception as e:
            info(f"Error sending/receiving message {i}: {e}")

        # Small delay between messages
        await asyncio.sleep(1)

    info("Client completed all messages")

Node.start(main)
```

**Dockerfiles:**

```dockerfile images/runner/Dockerfile theme={null}
FROM ghcr.io/build-trust/autonomy-python
COPY . .
ENTRYPOINT ["python", "main.py"]
```

```dockerfile images/client/Dockerfile theme={null}
FROM ghcr.io/build-trust/autonomy-python
COPY . .
ENTRYPOINT ["python", "main.py"]
```

**Deploy and Test:**

```bash theme={null}
# Deploy the zone
autonomy zone deploy

# View logs to see the communication
autonomy zone inlet --to logs > /tmp/logs.log 2>&1 &
sleep 5

# Open logs in browser
open http://127.0.0.1:32101
```

**Expected Output:**

Client logs show:

```
INFO node Waiting for runner to be ready...
INFO node Connected to runner: a9eb812238f753132652ae09963a05e9-multinode-runner
INFO node Sending message 1: Hello from client
INFO node Received reply 1: Hello! You said: Hello from client
INFO node Sending message 2: How are you?
INFO node Received reply 2: Hello! You said: How are you?
...
```

Runner logs show:

```
INFO node Starting runner node...
INFO node Runner is ready with greeter worker
INFO node Greeter received: Hello from client
INFO node Greeter received: How are you?
...
```

**Key Points:**

* Use `Zone.nodes(node, filter="runner")` to discover nodes by pod name prefix
* Implement a retry loop to wait for nodes to become available
* Messages are automatically routed across pods and machines
* Workers on remote nodes are accessed the same way as local workers
* The `filter` parameter matches against pod names (e.g., "runner" matches "runner-pod")

***

## Patterns

**Echo Worker** - Simple message echo:

```python theme={null}
class Echo:
    async def handle_message(self, context, message):
        await context.reply(message)

await node.start_worker("echo", Echo())
reply = await node.send_and_receive("echo", "hello", timeout=10)
```

**Stateful Counter** - Maintains count across messages:

```python theme={null}
class Counter:
    def __init__(self):
        self.count = 0
    
    async def handle_message(self, context, message):
        if message == "increment":
            self.count += 1
        elif message == "decrement":
            self.count -= 1
        elif message == "reset":
            self.count = 0
        await context.reply(str(self.count))
```

**JSON Message Handler** - Structured message processing:

```python theme={null}
class CommandHandler:
    async def handle_message(self, context, message):
        command = json.loads(message)
        action = command.get("action")
        
        if action == "process":
            result = await self.process(command.get("data"))
        elif action == "validate":
            result = await self.validate(command.get("data"))
        else:
            result = {"error": "Unknown action"}
        
        await context.reply(json.dumps(result))
```

**In-Memory Store** - Key-value storage pattern:

```python theme={null}
class InMemoryStore:
    def __init__(self):
        self.store = {}
    
    async def handle_message(self, context, message):
        # Parse command: {"action": "get|set", "key": "...", "value": "..."}
        command = json.loads(message)
        action = command.get("action")
        key = command.get("key")
        
        if action == "set":
            self.store[key] = command.get("value")
            await context.reply(json.dumps({"status": "ok"}))
        elif action == "get":
            value = self.store.get(key)
            await context.reply(json.dumps({"value": value}))

# Usage
await node.start_worker("store", InMemoryStore())

# Set a value
await node.send_and_receive("store", json.dumps({
    "action": "set",
    "key": "name",
    "value": "Alice"
}), timeout=10)

# Get a value
reply = await node.send_and_receive("store", json.dumps({
    "action": "get",
    "key": "name"
}), timeout=10)

result = json.loads(reply)
print(result["value"])  # "Alice"
```

**Distributed Processing** - Parallel processing across multiple machines:

```python theme={null}
async def distributed_process(node, data_chunks):
    # Discover runner nodes
    runners = await Zone.nodes(node, filter="runner")
    
    # Start processor on each runner
    for i, runner in enumerate(runners):
        await runner.start_worker(f"processor-{i}", DataProcessor())
    
    # Distribute work
    results = []
    for i, chunk in enumerate(data_chunks):
        worker_id = i % len(runners)  # Round-robin distribution
        result = await node.send_and_receive(
            f"processor-{worker_id}", 
            json.dumps(chunk), 
            timeout=30
        )
        results.append(json.loads(result))
    
    return results
```

**Fire-and-Forget Logger** - No-reply logging pattern:

```python theme={null}
class Logger:
    def __init__(self):
        self.log_file = open("app.log", "a")
    
    async def handle_message(self, context, message):
        timestamp = datetime.now().isoformat()
        self.log_file.write(f"[{timestamp}] {message}\n")
        self.log_file.flush()
        # No reply needed

# Usage
await node.start_worker("logger", Logger())
await node.send_message("logger", "Application started")
await node.send_message("logger", "User logged in")
```

**Error Handling with Cleanup** - Always clean up resources:

```python theme={null}
async def process_with_cleanup(node):
    runners = await Zone.nodes(node, filter="runner")
    workers = []
    
    try:
        # Create workers
        for i, runner in enumerate(runners):
            worker_name = f"worker-{i}"
            await runner.start_worker(worker_name, Processor())
            workers.append((runner, worker_name))
        
        # Process with error handling
        results = []
        for runner, worker_name in workers:
            try:
                reply = await node.send_and_receive(worker_name, message, timeout=10)
                results.append(reply)
            except RuntimeError as e:
                if "timeout" in str(e).lower():
                    print(f"Worker {worker_name} timed out")
                    continue
                raise
        
        return results
    
    finally:
        # Always cleanup
        for runner, worker_name in workers:
            try:
                await runner.stop_worker(worker_name)
            except Exception as e:
                print(f"Cleanup warning: {e}")
```

**Worker Monitoring** - Get a real-time view of all workers:

```python theme={null}
from autonomy import Node, Zone


async def list_all_workers(node):
    all_nodes = await Zone.nodes(node)
    
    for n in all_nodes:
        workers = await n.list_workers()
        print(f"Node {n.name}: {len(workers)} workers")
        for worker in workers:
            print(f"  - {worker}")
```

**Resource Management** - Manage limited resources across workers:

```python theme={null}
class ResourcePool:
    def __init__(self, max_resources=10):
        self.available = max_resources
        self.in_use = {}
    
    async def handle_message(self, context, message):
        command = json.loads(message)
        action = command.get("action")
        client_id = command.get("client_id")
        
        if action == "acquire":
            if self.available > 0:
                self.available -= 1
                self.in_use[client_id] = 1
                await context.reply(json.dumps({"status": "granted"}))
            else:
                await context.reply(json.dumps({"status": "denied"}))
        
        elif action == "release":
            if client_id in self.in_use:
                self.available += self.in_use.pop(client_id)
                await context.reply(json.dumps({"status": "released"}))
```

***

## Operations

### Monitoring

**Listing Workers** - Monitor active workers across your system:

```python theme={null}
# List workers on current node
local_workers = await node.list_workers()
print(f"Local workers: {local_workers}")

# List workers on all nodes
all_nodes = await Zone.nodes(node)
for n in all_nodes:
    workers = await n.list_workers()
    print(f"{n.name}: {workers}")
```

**Tracking Messages** - Add logging to track message flow:

```python theme={null}
class TrackedWorker:
    async def handle_message(self, context, message):
        print(f"[{datetime.now()}] Received: {message[:100]}")
        
        # Process message
        result = await self.process(message)
        
        print(f"[{datetime.now()}] Replying: {result[:100]}")
        await context.reply(result)
```

**System Health** - Monitor system health and performance:

```python theme={null}
async def health_check(node):
    health_status = {}
    
    # Check all nodes are reachable
    try:
        nodes = await Zone.nodes(node)
        health_status["nodes"] = len(nodes)
    except Exception as e:
        health_status["nodes_error"] = str(e)
    
    # Check critical workers
    critical_workers = ["database", "cache", "processor"]
    for worker in critical_workers:
        try:
            reply = await node.send_and_receive(worker, "ping", timeout=5)
            health_status[worker] = "healthy"
        except:
            health_status[worker] = "unhealthy"
    
    return health_status
```

### Troubleshooting

**Worker Not Responding**

**Symptoms:** `send_and_receive()` times out

**Solutions:**

1. Check worker was started successfully: `await node.list_workers()`
2. Increase timeout for complex operations
3. Check logs for errors in worker's `handle_message()`
4. Verify message format (must be a string)

**Timeout Issues** - Debug timeout problems:

```python theme={null}
class DebugWorker:
    async def handle_message(self, context, message):
        print(f"Started processing: {message}")
        start_time = time.time()
        
        # Your processing here
        result = await self.process(message)
        
        elapsed = time.time() - start_time
        print(f"Completed in {elapsed:.2f} seconds")
        
        await context.reply(result)
```

**State Problems** - Debug state issues:

```python theme={null}
class DebugState:
    def __init__(self):
        self.count = 0
        self.history = []
    
    async def handle_message(self, context, message):
        self.count += 1
        self.history.append(message)
        
        # Log current state
        print(f"Message #{self.count}: {message}")
        print(f"History length: {len(self.history)}")
        
        await context.reply(f"Processed message #{self.count}")
```

**Serialization Errors**

**Symptom:** `TypeError` when sending messages

**Cause:** Trying to send non-string messages

**Solution:**

```python theme={null}
# Wrong
await node.send_and_receive("worker", {"data": 123})  # ❌

# Right
await node.send_and_receive("worker", json.dumps({"data": 123}))  # ✅
```
