Building Multi-Agent Systems: A Graph-Theoretic Approach¶
How network topology determines consensus speed, fault tolerance, and scalability
(Or: Why your agents are arguing forever and what to do about it)
When you're building a system where multiple agents need to coordinate—whether that's LLM agents debating an answer, robots forming a swarm, or sensors averaging measurements—the way you connect them matters more than you might think.
This guide shows you how to use spectral graph theory to make informed decisions about your agent communication architecture. Yes, I'm about to make you care about eigenvalues. Trust me, it's worth it.
By the end, you'll understand:
- Why a single number (λ₂) predicts how fast your agents reach agreement
- How to choose topologies that balance speed, cost, and robustness
- How to identify and fix bottlenecks in your agent network
Let's build some agents. They're not going to coordinate themselves. (Actually, that's literally what we're teaching them to do. But you know what I mean.)
import numpy as np
import networkx as nx
import matplotlib.pyplot as plt
from scipy.linalg import eigh
import ipywidgets as widgets
from IPython.display import display
np.random.seed(42)
plt.style.use('seaborn-v0_8-whitegrid')
# Utility functions we'll use throughout
def get_laplacian(G):
"""Compute the graph Laplacian matrix."""
return nx.laplacian_matrix(G).toarray().astype(float)
def get_lambda2(G):
"""Compute the algebraic connectivity (Fiedler value)."""
L = get_laplacian(G)
evals = np.sort(np.real(np.linalg.eigvals(L)))
return evals[1] if len(evals) > 1 else 0
def get_fiedler_vector(G):
"""Compute the Fiedler vector (second eigenvector of Laplacian)."""
L = get_laplacian(G)
evals, evecs = eigh(L)
return evecs[:, 1]
print("Ready to build multi-agent systems.")
Ready to build multi-agent systems.
Part 1: The Consensus Problem¶
What Are We Solving?¶
You have n agents. Each starts with some value (an opinion, a measurement, a parameter estimate). They can only talk to their neighbors. You want them all to converge to the same value—typically the average of their initial values.
Think of it like a group project where everyone has a different idea, but instead of one person doing all the work while the others disappear, they actually communicate until they agree. Science fiction, I know.
Examples:
- LLM ensemble: 5 models vote on an answer; you want them to converge on a consensus response (without a 3-hour meeting)
- Robot swarm: 20 drones need to agree on a rendezvous point (preferably before the batteries die)
- Federated learning: 100 devices averaging model gradients
- Sensor network: 50 sensors filtering noise by averaging readings
The Update Rule¶
Each agent updates its state by moving toward the average of its neighbors:
$$\dot{x}_i = \sum_{j \in \text{neighbors}(i)} (x_j - x_i)$$
In matrix form: $\dot{\mathbf{x}} = -\mathbf{L}\mathbf{x}$, where L is the graph Laplacian.
This is beautifully simple: each agent just averages with its neighbors. No central coordinator. No global knowledge. No Slack channel with 47 unread messages. Yet they all converge to the same value.
def simulate_consensus(G, steps=100, dt=0.05, noise=0.0):
"""
Simulate consensus dynamics on a graph.
Each agent updates: x_i += dt * sum_neighbors(x_j - x_i) + noise
Returns: history array of shape (steps+1, n_agents)
"""
n = len(G)
L = get_laplacian(G)
# Random initial states
x = np.random.rand(n) * 10
history = [x.copy()]
for _ in range(steps):
dx = -L @ x + np.random.normal(0, noise, n)
x = x + dx * dt
history.append(x.copy())
return np.array(history)
# Quick demo: 5 agents in a path
G = nx.path_graph(5)
history = simulate_consensus(G, steps=80)
plt.figure(figsize=(10, 4))
for i in range(5):
plt.plot(history[:, i], label=f'Agent {i}')
plt.axhline(y=np.mean(history[0]), color='red', linestyle='--', label='Target (mean)')
plt.xlabel('Time steps')
plt.ylabel('Agent state')
plt.title('Consensus on a Path Graph: Agents Converge to the Mean')
plt.legend()
plt.tight_layout()
plt.show()
print(f"Initial states: {np.round(history[0], 2)}")
print(f"Final states: {np.round(history[-1], 2)}")
print(f"Target (mean): {np.mean(history[0]):.2f}")
Initial states: [3.75 9.51 7.32 5.99 1.56] Final states: [5.95 5.84 5.63 5.42 5.28] Target (mean): 5.62
Part 2: The Magic Number — λ₂¶
Why Some Networks Converge Faster¶
The algebraic connectivity (λ₂) is the second-smallest eigenvalue of the graph Laplacian.
Yes, I'm asking you to care about the second-smallest eigenvalue of a matrix. Stay with me.
It tells you:
| λ₂ Value | Meaning |
|---|---|
| λ₂ = 0 | Graph is disconnected — consensus impossible. Your agents are literally not talking to each other. |
| λ₂ small | Weak connectivity — slow convergence, bottlenecks exist. Like trying to spread gossip through introverts. |
| λ₂ large | Strong connectivity — fast convergence. Information superhighway. |
The Convergence Guarantee¶
The deviation from consensus decays exponentially:
$$\|x(t) - \bar{x}\|^2 \leq e^{-\lambda_2 t} \|x(0) - \bar{x}\|^2$$
Translation: Higher λ₂ = faster convergence. Double λ₂ and you roughly halve the time to consensus. It's the one weird trick that network engineers don't want you to know. (They do. They definitely do.)
Let's see this in action.
def compare_topologies(n=10):
"""Compare convergence speed across different topologies."""
topologies = {
'Complete': nx.complete_graph(n),
'Star': nx.star_graph(n-1),
'Cycle': nx.cycle_graph(n),
'Path': nx.path_graph(n)
}
fig, axes = plt.subplots(2, 4, figsize=(16, 8))
for idx, (name, G) in enumerate(topologies.items()):
lambda2 = get_lambda2(G)
history = simulate_consensus(G, steps=150)
# Draw graph
pos = nx.spring_layout(G, seed=42)
nx.draw(G, pos, ax=axes[0, idx], with_labels=True,
node_color='lightblue', node_size=300, font_size=8)
axes[0, idx].set_title(f"{name}\n$\\lambda_2$ = {lambda2:.3f}")
# Plot convergence
for i in range(len(G)):
axes[1, idx].plot(history[:, i], alpha=0.6)
axes[1, idx].axhline(y=np.mean(history[0]), color='red',
linestyle='--', alpha=0.8)
axes[1, idx].set_xlabel('Time')
axes[1, idx].set_ylabel('State')
# Calculate time to 95% consensus
initial_var = np.var(history[0])
for t, h in enumerate(history):
if np.var(h) < 0.05 * initial_var:
axes[1, idx].axvline(x=t, color='green', linestyle=':', alpha=0.7)
axes[1, idx].set_title(f'95% consensus at t={t}')
break
plt.suptitle('Topology Determines Convergence Speed', fontsize=14)
plt.tight_layout()
plt.show()
compare_topologies(10)
Key Observation¶
Notice the dramatic difference:
- Complete graph (λ₂ = n): Instant convergence — everyone talks to everyone. It's like a group chat where everyone actually reads the messages.
- Star graph (λ₂ = 1): Fast — the hub broadcasts to all. Great until the hub goes on vacation.
- Cycle (λ₂ ≈ 4/n²): Moderate — information flows around the ring. Like passing notes in class, but eventually everyone gets the message.
- Path (λ₂ ≈ π²/n²): Slowest — information must diffuse through a chain. This is the "telephone game" topology. By the time agent 10 hears from agent 1, agent 1 has already changed their mind.
Design implication: If your agents are taking too long to agree, check your λ₂. It's probably too small. This is the graph theory equivalent of "have you tried adding more connections?"
Part 3: The Trade-off Triangle¶
You can't optimize everything. Every topology trades off three properties.
Remember in college when they said you can have good grades, a social life, or sleep—but pick two? Same energy here.
Fast Convergence
(high λ₂)
/\
/ \
/ \
/ \
/________\
Low Comm Cost Fault Tolerance
(sparse graph) (redundant paths)
| Topology | Speed | Comm Cost | Fault Tolerance |
|---|---|---|---|
| Complete | *** | ❌ O(n²) | *** |
| Star | ** | ✅ O(n) | ❌ Hub = SPOF |
| Ring | * | ✅ O(n) | ** |
| Path | ❌ | ✅ O(n) | ❌ |
| Small-World | ** | ✅ O(n) | ** |
The sweet spot: Small-world networks (ring + random shortcuts) give you good λ₂ with O(n) edges. It's the "I'll have a salad but also fries" of network topologies.
def analyze_tradeoffs(n=15):
"""Visualize the trade-off between edges and convergence speed."""
topologies = {
'Path': nx.path_graph(n),
'Cycle': nx.cycle_graph(n),
'Star': nx.star_graph(n-1),
'Small-World (k=4, p=0.3)': nx.watts_strogatz_graph(n, 4, 0.3, seed=42),
'Random Regular (k=4)': nx.random_regular_graph(4, n, seed=42),
'Complete': nx.complete_graph(n)
}
data = []
for name, G in topologies.items():
lambda2 = get_lambda2(G)
edges = G.number_of_edges()
diameter = nx.diameter(G) if nx.is_connected(G) else float('inf')
data.append((name, lambda2, edges, diameter))
# Plot
fig, ax = plt.subplots(figsize=(10, 6))
names = [d[0] for d in data]
lambda2s = [d[1] for d in data]
edges = [d[2] for d in data]
colors = plt.cm.viridis(np.linspace(0, 1, len(data)))
for i, (name, l2, e, diam) in enumerate(data):
ax.scatter(e, l2, s=200, c=[colors[i]], label=f"{name}\n(diam={diam})")
ax.annotate(name.split()[0], (e, l2), textcoords="offset points",
xytext=(5, 5), fontsize=9)
ax.set_xlabel('Number of Edges (Communication Cost)', fontsize=12)
ax.set_ylabel('$\\lambda_2$ (Convergence Speed)', fontsize=12)
ax.set_title('The Trade-off: More Edges → Faster Convergence', fontsize=14)
# Highlight the efficient frontier
ax.axhline(y=1, color='gray', linestyle='--', alpha=0.5, label='λ₂ = 1 threshold')
plt.tight_layout()
plt.show()
# Print summary
print("\nTopology Comparison:")
print(f"{'Topology':<30} {'λ₂':>8} {'Edges':>8} {'Diameter':>10}")
print("-" * 60)
for name, l2, e, diam in data:
print(f"{name:<30} {l2:>8.3f} {e:>8} {diam:>10}")
analyze_tradeoffs(15)
Topology Comparison: Topology λ₂ Edges Diameter ------------------------------------------------------------ Path 0.044 14 14 Cycle 0.173 15 7 Star 1.000 14 2 Small-World (k=4, p=0.3) 0.909 30 4 Random Regular (k=4) 1.255 30 3 Complete 15.000 105 1
Part 4: Robustness — What Happens When Agents Fail?¶
In real systems, agents crash. Links drop. Your pager goes off at 3am. It's fine. Everything is fine.
You need to know:
- Does the network stay connected?
- How much does λ₂ degrade?
The Fiedler vector (eigenvector for λ₂) tells you where the network is weakest. Nodes with Fiedler values near zero are on the "boundary" between clusters—they're your bottlenecks. Think of them as the "if this person quits, we're screwed" nodes.
def analyze_fault_tolerance(topology='Star', n=12):
"""Analyze how node failures affect network connectivity."""
# Create graph
if topology == 'Star':
G = nx.star_graph(n-1)
elif topology == 'Ring':
G = nx.cycle_graph(n)
elif topology == 'Small-World':
G = nx.watts_strogatz_graph(n, 4, 0.3, seed=42)
else:
G = nx.random_regular_graph(3, n, seed=42)
original_lambda2 = get_lambda2(G)
fiedler = get_fiedler_vector(G)
# Find most critical node (closest to Fiedler boundary)
critical_node = np.argmin(np.abs(fiedler))
# Simulate failure of each node
failure_impact = []
for node in G.nodes():
G_failed = G.copy()
G_failed.remove_node(node)
if nx.is_connected(G_failed):
new_lambda2 = get_lambda2(G_failed)
failure_impact.append((node, new_lambda2, True))
else:
failure_impact.append((node, 0, False))
# Visualization
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
# Original graph colored by Fiedler vector
pos = nx.spring_layout(G, seed=42)
vmax = max(abs(fiedler.min()), abs(fiedler.max()))
nodes = nx.draw_networkx_nodes(G, pos, ax=axes[0], node_color=fiedler,
cmap='RdYlBu', node_size=400,
vmin=-vmax, vmax=vmax)
nx.draw_networkx_edges(G, pos, ax=axes[0], edge_color='gray')
nx.draw_networkx_labels(G, pos, ax=axes[0], font_size=8)
plt.colorbar(nodes, ax=axes[0], label='Fiedler value')
axes[0].set_title(f"{topology} Graph\nλ₂ = {original_lambda2:.3f}\n(colors = Fiedler vector)")
axes[0].axis('off')
# Impact of each node failure
nodes_list = [f[0] for f in failure_impact]
lambda2s = [f[1] for f in failure_impact]
connected = [f[2] for f in failure_impact]
colors = ['green' if c else 'red' for c in connected]
axes[1].bar(nodes_list, lambda2s, color=colors, alpha=0.7)
axes[1].axhline(y=original_lambda2, color='blue', linestyle='--',
label=f'Original λ₂ = {original_lambda2:.3f}')
axes[1].set_xlabel('Failed Node')
axes[1].set_ylabel('Remaining λ₂')
axes[1].set_title('Impact of Single Node Failure\n(red = disconnected)')
axes[1].legend()
# Show worst-case failure
worst_node = min(failure_impact, key=lambda x: x[1])[0]
G_worst = G.copy()
G_worst.remove_node(worst_node)
if nx.is_connected(G_worst):
pos_worst = nx.spring_layout(G_worst, seed=42)
nx.draw(G_worst, pos_worst, ax=axes[2], with_labels=True,
node_color='lightyellow', node_size=400)
new_l2 = get_lambda2(G_worst)
axes[2].set_title(f"After Node {worst_node} Fails\nλ₂ = {new_l2:.3f}")
else:
# Show disconnected components
components = list(nx.connected_components(G_worst))
pos_worst = nx.spring_layout(G_worst, seed=42)
colors = ['lightcoral' if n in components[0] else 'lightblue'
for n in G_worst.nodes()]
nx.draw(G_worst, pos_worst, ax=axes[2], with_labels=True,
node_color=colors, node_size=400)
axes[2].set_title(f"After Node {worst_node} Fails\nDISCONNECTED!")
axes[2].axis('off')
plt.tight_layout()
plt.show()
# Summary
disconnecting_nodes = [f[0] for f in failure_impact if not f[2]]
print(f"\nFault Tolerance Analysis for {topology}:")
print(f" Original λ₂: {original_lambda2:.4f}")
print(f" Nodes that disconnect the graph: {disconnecting_nodes if disconnecting_nodes else 'None'}")
print(f" Most critical node: {worst_node}")
# Interactive exploration
widgets.interact(analyze_fault_tolerance,
topology=['Star', 'Ring', 'Small-World', 'Random-Regular'],
n=widgets.IntSlider(min=8, max=20, value=12));
interactive(children=(Dropdown(description='topology', options=('Star', 'Ring', 'Small-World', 'Random-Regular…
Key Insights¶
- Star topology: The hub (node 0) is a single point of failure. Remove it and the network shatters into lonely, disconnected agents wondering what happened. This is the "entire company depends on one senior engineer" architecture.
- Ring topology: Any single failure keeps the network connected (becomes a path), but λ₂ drops significantly. Survivable, but now everyone's talking through a chain of whispers.
- Small-world/Random-regular: Multiple redundant paths. Most single failures barely affect λ₂. This is the "we actually did succession planning" topology.
Design rule: If fault tolerance matters, ensure no single node/edge is a bridge. The Fiedler vector highlights your vulnerabilities—it's basically a "who should we give a raise to so they don't quit" detector.
Part 5: Noise Resilience¶
Real agents have noisy observations. Sensors drift. LLMs hallucinate. Robots bump into things. How does topology affect the system's ability to filter out the chaos?
With additive noise: $$\dot{x}_i = \sum_{j \in N_i} (x_j - x_i) + \xi_i(t)$$
The variance of deviation from consensus scales as σ² / λ₂.
Translation: Higher λ₂ = better noise rejection. Dense networks act as low-pass filters, smoothing out the "that one agent who's clearly wrong" problem. It's peer pressure, but make it math.
def compare_noise_resilience(noise_level=0.5):
"""Compare how different topologies handle noisy agents."""
n = 12
topologies = {
'Complete': nx.complete_graph(n),
'Small-World': nx.watts_strogatz_graph(n, 4, 0.3, seed=42),
'Ring': nx.cycle_graph(n),
'Path': nx.path_graph(n)
}
fig, axes = plt.subplots(2, 4, figsize=(16, 8))
for idx, (name, G) in enumerate(topologies.items()):
lambda2 = get_lambda2(G)
history = simulate_consensus(G, steps=200, noise=noise_level)
# Graph
pos = nx.spring_layout(G, seed=42)
nx.draw(G, pos, ax=axes[0, idx], with_labels=False,
node_color='lightblue', node_size=200)
axes[0, idx].set_title(f"{name}\nλ₂ = {lambda2:.3f}")
# Convergence with noise
for i in range(len(G)):
axes[1, idx].plot(history[:, i], alpha=0.5)
axes[1, idx].axhline(y=np.mean(history[0]), color='red',
linestyle='--', alpha=0.8)
# Calculate final variance
final_var = np.var(history[-50:]) # Average over last 50 steps
axes[1, idx].set_title(f'Final variance: {final_var:.3f}')
axes[1, idx].set_xlabel('Time')
axes[1, 0].set_ylabel('Agent State')
plt.suptitle(f'Noise Resilience Comparison (noise σ = {noise_level})', fontsize=14)
plt.tight_layout()
plt.show()
widgets.interact(compare_noise_resilience,
noise_level=widgets.FloatSlider(min=0.0, max=1.0, step=0.1, value=0.5));
interactive(children=(FloatSlider(value=0.5, description='noise_level', max=1.0), Output()), _dom_classes=('wi…
Part 6: Practical Design Guide¶
Decision Framework¶
Okay, enough theory. You have agents to ship. Here's the cheat sheet:
| Your Priority | Recommended Topology | Why |
|---|---|---|
| Speed above all | Complete or near-complete | Maximum λ₂. Your AWS bill will also be maximum. |
| Limited bandwidth | Random regular (k=3 or 4) | Good λ₂ with O(n) edges. The sensible choice. |
| Fault tolerance | Small-world or random regular | Redundant paths, no SPOF. Sleep through the night. |
| Hierarchical control | Tree + shortcuts | Be aware of low λ₂ at bottlenecks. Middle management problems, but in graph form. |
| Geographic constraints | Grid + diagonal links | Natural for physical layouts. Also looks cool on architecture diagrams. |
Quick Checks (a.k.a. "Is My Network Broken?")¶
- Is λ₂ > 0? If not, your network is disconnected. Your agents are literally ghosting each other.
- Is λ₂ > 1? If not, convergence will be slow. Time to add some edges.
- Does removing any single node disconnect the graph? If yes, you have a SPOF. Fix it before it fixes your weekend plans.
- Is the Fiedler vector bimodal? If yes, you have a natural partition (potential bottleneck). Your network has cliques, and not the good kind.
def design_checker(n_agents=15, topology='Small-World'):
"""Interactive tool to evaluate your agent network design."""
# Create graph
if topology == 'Complete':
G = nx.complete_graph(n_agents)
elif topology == 'Star':
G = nx.star_graph(n_agents - 1)
elif topology == 'Ring':
G = nx.cycle_graph(n_agents)
elif topology == 'Path':
G = nx.path_graph(n_agents)
elif topology == 'Small-World':
G = nx.watts_strogatz_graph(n_agents, 4, 0.3, seed=42)
elif topology == 'Random-Regular-3':
G = nx.random_regular_graph(3, n_agents, seed=42)
else: # Grid
side = int(np.sqrt(n_agents))
G = nx.grid_2d_graph(side, side)
G = nx.convert_node_labels_to_integers(G)
# Compute metrics
lambda2 = get_lambda2(G)
edges = G.number_of_edges()
diameter = nx.diameter(G) if nx.is_connected(G) else float('inf')
# Check for single points of failure
articulation_points = list(nx.articulation_points(G))
bridges = list(nx.bridges(G))
# Visualization
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
# Draw graph with articulation points highlighted
pos = nx.spring_layout(G, seed=42)
node_colors = ['red' if n in articulation_points else 'lightblue' for n in G.nodes()]
nx.draw(G, pos, ax=axes[0], with_labels=True, node_color=node_colors,
node_size=400, font_size=8)
axes[0].set_title(f"{topology} (n={len(G)})\nRed = Single Points of Failure")
# Consensus simulation
history = simulate_consensus(G, steps=100)
for i in range(len(G)):
axes[1].plot(history[:, i], alpha=0.6)
axes[1].axhline(y=np.mean(history[0]), color='red', linestyle='--')
axes[1].set_xlabel('Time')
axes[1].set_ylabel('Agent State')
axes[1].set_title('Consensus Dynamics')
plt.tight_layout()
plt.show()
# Report card
print("\n" + "="*50)
print(f"DESIGN REPORT: {topology}")
print("="*50)
print(f"\nAgents: {len(G)}")
print(f"Edges: {edges} (communication cost per round)")
print(f"")
# Speed check
print(f"SPEED:")
print(f" λ₂ = {lambda2:.4f}", end="")
if lambda2 >= len(G) * 0.5:
print(" ✅ Excellent")
elif lambda2 >= 1:
print(" ✅ Good")
elif lambda2 > 0:
print(" ⚠️ Slow convergence expected")
else:
print(" ❌ DISCONNECTED")
print(f" Diameter = {diameter} hops")
print(f"")
# Fault tolerance check
print(f"FAULT TOLERANCE:")
if len(articulation_points) == 0:
print(f" ✅ No single points of failure")
else:
print(f" ⚠️ Articulation points (SPOFs): {articulation_points}")
if len(bridges) == 0:
print(f" ✅ No bridge edges")
else:
print(f" ⚠️ Bridge edges: {len(bridges)}")
print(f"")
# Efficiency check
print(f"EFFICIENCY:")
max_edges = len(G) * (len(G) - 1) // 2
density = edges / max_edges
print(f" Edge density: {density:.1%}")
print(f" λ₂ per edge: {lambda2 / edges:.4f}")
widgets.interact(design_checker,
n_agents=widgets.IntSlider(min=8, max=25, value=15),
topology=['Complete', 'Star', 'Ring', 'Path',
'Small-World', 'Random-Regular-3', 'Grid']);
interactive(children=(IntSlider(value=15, description='n_agents', max=25, min=8), Dropdown(description='topolo…
Part 7: Building Your Agent Network¶
Here's a template for implementing consensus in your own multi-agent system.
Copy it. Modify it. Ship it. Take credit at the standup.
class MultiAgentSystem:
"""
A simple multi-agent consensus system.
Usage:
mas = MultiAgentSystem(n_agents=10, topology='small-world')
mas.set_initial_states([1, 2, 3, ...]) # or random
mas.run(steps=100)
mas.plot_history()
"""
def __init__(self, n_agents=10, topology='small-world', k=4, p=0.3):
self.n = n_agents
self.topology = topology
# Build communication graph
if topology == 'complete':
self.G = nx.complete_graph(n_agents)
elif topology == 'ring':
self.G = nx.cycle_graph(n_agents)
elif topology == 'star':
self.G = nx.star_graph(n_agents - 1)
elif topology == 'small-world':
self.G = nx.watts_strogatz_graph(n_agents, k, p)
elif topology == 'random-regular':
self.G = nx.random_regular_graph(k, n_agents)
else:
raise ValueError(f"Unknown topology: {topology}")
self.L = get_laplacian(self.G)
self.lambda2 = get_lambda2(self.G)
self.states = None
self.history = []
def set_initial_states(self, states=None):
"""Set initial agent states. If None, use random."""
if states is None:
self.states = np.random.rand(self.n) * 10
else:
self.states = np.array(states)
self.history = [self.states.copy()]
def step(self, dt=0.05, noise=0.0):
"""Execute one consensus update step."""
dx = -self.L @ self.states
if noise > 0:
dx += np.random.normal(0, noise, self.n)
self.states = self.states + dx * dt
self.history.append(self.states.copy())
def run(self, steps=100, dt=0.05, noise=0.0):
"""Run consensus for multiple steps."""
if self.states is None:
self.set_initial_states()
for _ in range(steps):
self.step(dt, noise)
def get_consensus_value(self):
"""Return the theoretical consensus value (mean of initial states)."""
return np.mean(self.history[0])
def get_convergence_time(self, threshold=0.05):
"""Return time to reach threshold% of initial variance."""
initial_var = np.var(self.history[0])
for t, h in enumerate(self.history):
if np.var(h) < threshold * initial_var:
return t
return len(self.history)
def plot_history(self):
"""Visualize the consensus process."""
history = np.array(self.history)
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
# Graph
pos = nx.spring_layout(self.G, seed=42)
nx.draw(self.G, pos, ax=axes[0], with_labels=True,
node_color='lightblue', node_size=400)
axes[0].set_title(f"{self.topology} (n={self.n})\nλ₂ = {self.lambda2:.4f}")
# Convergence
for i in range(self.n):
axes[1].plot(history[:, i], alpha=0.7)
axes[1].axhline(y=self.get_consensus_value(), color='red',
linestyle='--', label='Consensus')
axes[1].set_xlabel('Time')
axes[1].set_ylabel('State')
axes[1].set_title(f'Convergence (95% at t={self.get_convergence_time()})')
axes[1].legend()
plt.tight_layout()
plt.show()
def report(self):
"""Print a summary of the system."""
print(f"Multi-Agent System Report")
print(f" Topology: {self.topology}")
print(f" Agents: {self.n}")
print(f" Edges: {self.G.number_of_edges()}")
print(f" λ₂: {self.lambda2:.4f}")
print(f" Diameter: {nx.diameter(self.G)}")
print(f" Articulation points: {list(nx.articulation_points(self.G))}")
# Demo
mas = MultiAgentSystem(n_agents=12, topology='small-world')
mas.run(steps=100)
mas.plot_history()
mas.report()
/var/folders/sg/kxjt6q4s0393w22bwtcpr70c0000gn/T/ipykernel_33334/1776711779.py:92: UserWarning: Glyph 8322 (\N{SUBSCRIPT TWO}) missing from font(s) Arial.
plt.tight_layout()
Multi-Agent System Report Topology: small-world Agents: 12 Edges: 24 λ₂: 1.5310 Diameter: 3 Articulation points: []
Summary: What You Need to Remember¶
(Print this out. Tape it to your monitor. Tattoo it on your forearm. Whatever works.)
The One Number That Matters¶
λ₂ (algebraic connectivity) predicts:
- Convergence speed: $\sim e^{-\lambda_2 t}$
- Noise resilience: variance $\sim \sigma^2 / \lambda_2$
- Connectivity strength: λ₂ = 0 means disconnected (your agents have given up on each other)
The Trade-offs¶
| Want This? | Do This | Accept This |
|---|---|---|
| Fast consensus | Add edges | Higher comm cost (and complexity) |
| Low comm cost | Sparse graph | Slower consensus (patience is a virtue) |
| Fault tolerance | Redundant paths | More edges (redundancy isn't free) |
Quick Design Rules¶
- Start with small-world or random-regular — good balance of speed and efficiency. The Toyota Camry of network topologies.
- Check for articulation points — these are your single points of failure. Name them. Know them. Fear them.
- Aim for λ₂ > 1 — below this, convergence gets noticeably slow. Your agents will be arguing until the heat death of the universe.
- Use the Fiedler vector — it shows you where your network is weakest. It's like a network MRI.
Code You Can Copy¶
import networkx as nx
import numpy as np
# Create your agent network
G = nx.watts_strogatz_graph(n=20, k=4, p=0.3)
# Check λ₂
L = nx.laplacian_matrix(G).toarray()
lambda2 = sorted(np.linalg.eigvals(L))[1].real
print(f"λ₂ = {lambda2:.4f}")
# Check for single points of failure
spofs = list(nx.articulation_points(G))
print(f"SPOFs: {spofs}")
Now go build your multi-agent system. The math is on your side. The eigenvalues believe in you. 🚀