Use this file to discover all available pages before exploring further.
Autonomy is designed on principles from the actor model, a powerful
pattern for building concurrent and distributed systems. Before we dive into
agents, it’s helpful to understand this foundation of actors.Think of actors as independent members of a team. Each has their own
workspace (state), and they communicate only by passing notes (messages) to each
other. No one shares their workspace, this prevents contention and makes
applications naturally parallelizable and scalable.Actors are lightweight, stateful objects that communicate using messages.Each actor has:
A unique address that other actors use to send it messages.
A mailbox where incoming messages queue up and wait to be processed.
Internal state that only this actor can access or modify.
Behavior, logic that defines how this actor reacts to each message it receives.
When an actor receives a message, it can:
Send messages to other actors (asynchronously, without waiting for responses).
Create new actors and delegate work to them.
Update its internal state which may affect how future messages are handled.
No shared stateEach actor’s state is completely private. Other actors cannot directly access
or modify it. This eliminates entire classes of concurrency bugs like race conditions
and deadlocks. Actors can only influence each other by sending messages.Message ProcessingWhile many actors run in parallel across the system, each individual actor
processes only one message at a time. Messages wait in the actor’s mailbox and
are handled sequentially. This means you never need to worry about two threads
modifying an actor’s state simultaneously; it simply can’t happen.Asynchronous message passingWhen Actor A sends a message to Actor B, it doesn’t have to wait for B to
process it or respond. The message is queued in B’s mailbox, and A can continue
with its work immediately. This non-blocking communication allows actors to work
concurrently without waiting on each other.Location TransparencySending or receiving a message from another actor looks the
same whether that other actor is running on the same machine or across the network.
This makes it natural to distribute work and scale horizontally.In Autonomy:
Simple actors are called workers.
Agents are intelligent autonomous actors that use large language models.
Both follow the actor model.
When an actor is idle and there are no messages in its mailbox,
it consumes no CPU. This design makes it easy to run thousands of concurrent
stateful actors that make the most optimal use of available CPU cores.Agents, for example, spend a majority of their lifespan waiting for calls
to language models or tools to finish. While one agent is waiting, the actor
runtime automatically gives the CPU core to a different actor that has a
new message to process.This enables highly efficient and horizontally scalable applications.
Workers are Autonomy’s implementation of actors.Autonomy’s actor runtime is implemented in Rust and exposed to Python code
using the Node class. All nodes in a zone can create encrypted, mutually authenticated,
secure communication channels with other nodes.Here’s a simple worker that echoes messages back:
from autonomy import Nodeclass Greeter: async def handle_message(self, context, message): await context.reply(f"Hello, {message}!")async def main(node): # Start the worker await node.start_worker("greeter", Greeter()) # Send a message and wait for reply reply = await node.send_and_receive("greeter", "Alice", timeout=10) print(reply) # "Hello, Alice!"Node.start(main)
What’s happening:
Define a worker class with handle_message().
When Node.start(main) is called, it turns the main function itself into a worker that can send and receive messages like other workers. This is why main is able to communicate with greeter.
Start the Greeter worker with a unique name (greeter).
Send a message from main to the greeter worker.
The greeter worker processes the message and replies.
The main worker receives the reply.
Worker LifecycleWorkers can be started and stopped dynamically:
# Start a workerawait node.start_worker("worker_name", WorkerClass())# List all workersworkers = await node.list_workers()# Stop a workerawait node.stop_worker("worker_name")
Message TypesMessages must be strings. This is because the messaging layer transmits
messages across the network, and strings are simple, universal, and work everywhere.
Stateful WorkersWorkers can maintain state across messages. This is safe because workers process one message at a time:
class Counter: def __init__(self): self.count = 0 async def handle_message(self, context, message): if message == "increment": self.count += 1 await context.reply(f"Count: {self.count}")async def main(node): await node.start_worker("counter", Counter()) # Each message sees the updated state print(await node.send_and_receive("counter", "increment", timeout=10)) # Count: 1 print(await node.send_and_receive("counter", "increment", timeout=10)) # Count: 2 print(await node.send_and_receive("counter", "increment", timeout=10)) # Count: 3
Use cases for stateful workers:
Session management
Connection pooling
Rate limiting
Caching
Accumulating results
State IsolationNo shared state between workers - each worker is completely isolated:
# Each worker has its own stateawait node.start_worker("counter-1", Counter())await node.start_worker("counter-2", Counter())# These two counters are completely independent
Workers are single-threaded - state is safe within a worker:
class Safe: def __init__(self): self.count = 0 # Safe - only one message at a time async def handle_message(self, context, message): self.count += 1 # No race conditions
InitializationUse __init__() for one-time setup:
class Database: def __init__(self): self.connection = create_connection() # Setup once async def handle_message(self, context, message): result = self.connection.query(message) await context.reply(result)
Timeout ErrorsSymptoms:RuntimeError with “timeout” in the messagePossible causes:
Worker doing heavy computation (increase timeout)
Worker crashed (check logs)
Network issues (for distributed workers)
Worker stuck on previous message
# Use longer timeout for complex operations (timeout in seconds)try: reply = await node.send_and_receive("worker", msg, timeout=30)except RuntimeError as e: if "timeout" in str(e).lower(): print("Operation timed out after 30 seconds") else: raise
Recovery StrategiesImplement fallback strategies for resilience:
try: reply = await node.send_and_receive("primary", msg, timeout=5)except RuntimeError as e: if "timeout" in str(e).lower(): # Have a fallback strategy reply = await node.send_and_receive("backup", msg, timeout=5) else: raise
Node DiscoveryDiscover and connect to remote nodes:
from autonomy import Node, Zoneasync def main(node): # Discover all runner nodes runners = await Zone.nodes(node, filter="runner") print(f"Found {len(runners)} runner nodes") # Get all nodes (no filter) all_nodes = await Zone.nodes(node) print(f"Total nodes: {len(all_nodes)}")
Remote Worker ManagementStart and manage workers on remote nodes:
images/main/main.py
from autonomy import Node, Zoneclass DataProcessor: async def handle_message(self, context, message): # Process data here result = f"Processed: {message}" await context.reply(result)async def main(node): # Discover all runner nodes runners = await Zone.nodes(node, filter="runner") print(f"Found {len(runners)} runner nodes") # Start a worker on each runner for i, runner in enumerate(runners): await runner.start_worker(f"processor-{i}", DataProcessor()) # Send work to each worker for i in range(len(runners)): result = await node.send_and_receive(f"processor-{i}", f"task-{i}", timeout=10) print(result)Node.start(main)
What’s happening:
Zone.nodes() discovers all nodes matching a filter
Start workers on each remote node
Send messages to workers by name—they run on different machines!
Messages are automatically routed across the network
from autonomy import Node, infoclass Greeter: async def handle_message(self, context, message): info(f"Greeter received: {message}") reply = f"Hello! You said: {message}" await context.reply(reply)async def main(node): info("Starting runner node...") await node.start_worker("greeter", Greeter()) info("Runner is ready with greeter worker")Node.start(main)
Client Node:
images/client/main.py
from autonomy import Node, Zone, infoimport asyncioasync def main(node): info("Client node started") # Wait for runner to be ready with retry loop info("Waiting for runner to be ready...") runners = [] max_retries = 10 for attempt in range(max_retries): runners = await Zone.nodes(node, filter="runner") if runners: break info(f"No runner nodes found yet, retrying... (attempt {attempt + 1}/{max_retries})") await asyncio.sleep(2) if not runners: info("No runner nodes found after waiting!") return runner = runners[0] info(f"Connected to runner: {runner.name}") # Send messages to the greeter worker messages = [ "Hello from client", "How are you?", "This is a test message", "Goodbye!" ] for i, message in enumerate(messages, 1): info(f"Sending message {i}: {message}") try: reply = await runner.send_and_receive("greeter", message, timeout=30) info(f"Received reply {i}: {reply}") except Exception as e: info(f"Error sending/receiving message {i}: {e}") # Small delay between messages await asyncio.sleep(1) info("Client completed all messages")Node.start(main)
Dockerfiles:
images/runner/Dockerfile
FROM ghcr.io/build-trust/autonomy-pythonCOPY . .ENTRYPOINT ["python", "main.py"]
images/client/Dockerfile
FROM ghcr.io/build-trust/autonomy-pythonCOPY . .ENTRYPOINT ["python", "main.py"]
Deploy and Test:
# Deploy the zoneautonomy zone deploy# View logs to see the communicationautonomy zone inlet --to logs > /tmp/logs.log 2>&1 &sleep 5# Open logs in browseropen http://127.0.0.1:32101
Expected Output:Client logs show:
INFO node Waiting for runner to be ready...INFO node Connected to runner: a9eb812238f753132652ae09963a05e9-multinode-runnerINFO node Sending message 1: Hello from clientINFO node Received reply 1: Hello! You said: Hello from clientINFO node Sending message 2: How are you?INFO node Received reply 2: Hello! You said: How are you?...
Runner logs show:
INFO node Starting runner node...INFO node Runner is ready with greeter workerINFO node Greeter received: Hello from clientINFO node Greeter received: How are you?...
Key Points:
Use Zone.nodes(node, filter="runner") to discover nodes by pod name prefix
Implement a retry loop to wait for nodes to become available
Messages are automatically routed across pods and machines
Workers on remote nodes are accessed the same way as local workers
The filter parameter matches against pod names (e.g., “runner” matches “runner-pod”)
Distributed Processing - Parallel processing across multiple machines:
async def distributed_process(node, data_chunks): # Discover runner nodes runners = await Zone.nodes(node, filter="runner") # Start processor on each runner for i, runner in enumerate(runners): await runner.start_worker(f"processor-{i}", DataProcessor()) # Distribute work results = [] for i, chunk in enumerate(data_chunks): worker_id = i % len(runners) # Round-robin distribution result = await node.send_and_receive( f"processor-{worker_id}", json.dumps(chunk), timeout=30 ) results.append(json.loads(result)) return results
Error Handling with Cleanup - Always clean up resources:
async def process_with_cleanup(node): runners = await Zone.nodes(node, filter="runner") workers = [] try: # Create workers for i, runner in enumerate(runners): worker_name = f"worker-{i}" await runner.start_worker(worker_name, Processor()) workers.append((runner, worker_name)) # Process with error handling results = [] for runner, worker_name in workers: try: reply = await node.send_and_receive(worker_name, message, timeout=10) results.append(reply) except RuntimeError as e: if "timeout" in str(e).lower(): print(f"Worker {worker_name} timed out") continue raise return results finally: # Always cleanup for runner, worker_name in workers: try: await runner.stop_worker(worker_name) except Exception as e: print(f"Cleanup warning: {e}")
Worker Monitoring - Get a real-time view of all workers:
from autonomy import Node, Zoneasync def list_all_workers(node): all_nodes = await Zone.nodes(node) for n in all_nodes: workers = await n.list_workers() print(f"Node {n.name}: {len(workers)} workers") for worker in workers: print(f" - {worker}")
Resource Management - Manage limited resources across workers:
Listing Workers - Monitor active workers across your system:
# List workers on current nodelocal_workers = await node.list_workers()print(f"Local workers: {local_workers}")# List workers on all nodesall_nodes = await Zone.nodes(node)for n in all_nodes: workers = await n.list_workers() print(f"{n.name}: {workers}")
Tracking Messages - Add logging to track message flow:
class TrackedWorker: async def handle_message(self, context, message): print(f"[{datetime.now()}] Received: {message[:100]}") # Process message result = await self.process(message) print(f"[{datetime.now()}] Replying: {result[:100]}") await context.reply(result)
System Health - Monitor system health and performance: