Harnessing OpenAI Swarm for Complex SaaS Automations
Is Multi-Agent orchestration the next frontier? Learn how to implement OpenAI Swarm patterns within your SaaS backend to handle non-linear customer success and sales workflows.
What You’ll Build
In this guide, you will build a multi-agent customer success system inspired by the OpenAI Swarm pattern. By the end, you will have:
- A Router Agent that analyzes incoming customer requests and intelligently dispatches them to the right specialist
- A Technical Troubleshooter Agent that searches documentation and logs to resolve issues
- A Sales Upseller Agent that identifies upgrade opportunities during support interactions
- A Billing Support Agent that handles invoice and payment questions
- A handoff mechanism that allows agents to transfer conversations seamlessly, preserving full context
- A REST API wrapper so you can plug the entire system into your existing SaaS backend
The result is a customer success pipeline that feels proactive rather than reactive, handling non-linear workflows that would be impossible with traditional if-this-then-that automation.
Prerequisites
| Requirement | Details |
|---|---|
| Python | Version 3.10 or higher (we use match statements and modern async features) |
| OpenAI API Key | An active key with access to gpt-4o or gpt-4o-mini |
| Async Python | Basic familiarity with asyncio, async/await, and type hints |
| Package Manager | pip or uv for dependency management |
| Optional | A running SaaS backend with REST endpoints for live integration |
Architecture Overview
The system follows a hub-and-spoke architecture. The Router Agent sits at the center, receiving every inbound customer message. It classifies the intent and hands the conversation off to the appropriate specialist agent. Each specialist can either resolve the request or hand it back to the router for re-classification.
Customer Message
│
▼
┌──────────────┐
│ Router Agent │ ← Classifies intent
└──────┬───────┘
│
┌────┼─────────────┐
▼ ▼ ▼
┌────┐ ┌──────┐ ┌─────────┐
│Tech│ │Sales │ │ Billing │
│ │ │Upsell│ │ Support │
└────┘ └──────┘ └─────────┘
Every agent shares a common ConversationContext object. When a handoff occurs, the full history travels with it, so the receiving agent has complete awareness of what has already been discussed.
Step 1: Understanding the Swarm Pattern
Traditional chatbot architectures use a single monolithic prompt that tries to handle every scenario. This approach breaks down quickly in SaaS environments where customer interactions span billing, technical support, account management, and sales in a single conversation.
The Swarm pattern solves this by decomposing the problem into specialized agents, each with a focused system prompt, a targeted tool set, and clear boundaries for what it can and cannot handle.
Multi-Agent Orchestration Concepts
There are three core principles:
- Single Responsibility — Each agent does one thing well. The troubleshooter never discusses pricing; the upseller never debugs API errors.
- Explicit Handoff — Agents do not silently redirect. They produce a structured handoff object that the orchestrator interprets, ensuring auditability.
- Shared Context — All agents read from and write to the same conversation history, so the customer never has to repeat themselves.
Agent Handoff Mechanism
A handoff is a structured signal from one agent to the orchestrator. It contains:
target_agent: which specialist should take overreason: why the handoff is happening (used for logging and analytics)context_update: any new information the current agent discovered that the next agent needs
The orchestrator receives this signal, updates the shared context, and invokes the target agent on the next turn.
Key Concept: The Swarm pattern is not about agents talking to each other directly. It is about agents communicating through a central orchestrator that manages state, enforces routing rules, and maintains a complete audit trail.
Step 2: Set Up the Project
Start by creating the project structure and installing dependencies.
# Install dependencies
# pip install openai pydantic fastapi uvicorn
# Project structure:
# swarm_saas/
# ├── main.py # FastAPI entry point
# ├── orchestrator.py # Central orchestration engine
# ├── agents/
# │ ├── __init__.py
# │ ├── base.py # Base agent class
# │ ├── router.py # Router agent
# │ ├── troubleshooter.py
# │ ├── upseller.py
# │ └── billing.py
# ├── models.py # Pydantic data models
# └── config.py # Configuration and prompts
Define the shared data models that every agent will use:
# models.py
from pydantic import BaseModel, Field
from enum import Enum
from datetime import datetime
class AgentType(str, Enum):
ROUTER = "router"
TROUBLESHOOTER = "troubleshooter"
UPSELLER = "upseller"
BILLING = "billing"
class Message(BaseModel):
role: str
content: str
agent: AgentType | None = None
timestamp: datetime = Field(default_factory=datetime.utcnow)
class CustomerProfile(BaseModel):
customer_id: str
name: str
plan: str = "free"
mrr: float = 0.0
open_tickets: int = 0
last_login: datetime | None = None
tags: list[str] = Field(default_factory=list)
class HandoffSignal(BaseModel):
target_agent: AgentType
reason: str
context_update: dict = Field(default_factory=dict)
class ConversationContext(BaseModel):
conversation_id: str
customer: CustomerProfile
messages: list[Message] = Field(default_factory=list)
current_agent: AgentType = AgentType.ROUTER
metadata: dict = Field(default_factory=dict)
handoff_history: list[HandoffSignal] = Field(default_factory=list)
def add_message(self, role: str, content: str, agent: AgentType | None = None):
self.messages.append(Message(role=role, content=content, agent=agent))
def get_history_for_prompt(self) -> list[dict]:
return [{"role": m.role, "content": m.content} for m in self.messages]
Set up the configuration file:
# config.py
import os
OPENAI_API_KEY = os.environ["OPENAI_API_KEY"]
MODEL = "gpt-4o"
TEMPERATURE = 0.3
MAX_HANDOFFS_PER_TURN = 5 # Safety limit to prevent infinite loops
Expected Result: You should have a clean project directory with all data models defined. Run
python -c "from models import ConversationContext; print('Models OK')"to verify everything imports correctly.
Step 3: Build the Router Agent
The Router Agent is the entry point for every customer message. Its job is to classify intent and produce a handoff signal.
# agents/base.py
from abc import ABC, abstractmethod
from openai import AsyncOpenAI
from models import ConversationContext, HandoffSignal, AgentType
import config
import json
client = AsyncOpenAI(api_key=config.OPENAI_API_KEY)
class BaseAgent(ABC):
agent_type: AgentType
system_prompt: str
@abstractmethod
async def process(self, context: ConversationContext) -> str | HandoffSignal:
"""Process a message and return a response or a handoff signal."""
...
async def call_llm(
self,
context: ConversationContext,
tools: list[dict] | None = None,
) -> dict:
messages = [
{"role": "system", "content": self.system_prompt},
*context.get_history_for_prompt(),
]
kwargs = {
"model": config.MODEL,
"messages": messages,
"temperature": config.TEMPERATURE,
}
if tools:
kwargs["tools"] = tools
kwargs["tool_choice"] = "auto"
response = await client.chat.completions.create(**kwargs)
return response.choices[0].message
Now build the Router itself:
# agents/router.py
import json
from agents.base import BaseAgent
from models import ConversationContext, HandoffSignal, AgentType
ROUTER_TOOLS = [
{
"type": "function",
"function": {
"name": "route_to_agent",
"description": "Route the customer to a specialist agent based on their intent.",
"parameters": {
"type": "object",
"properties": {
"target_agent": {
"type": "string",
"enum": ["troubleshooter", "upseller", "billing"],
"description": "The specialist agent to hand off to.",
},
"reason": {
"type": "string",
"description": "Brief explanation of why this routing was chosen.",
},
},
"required": ["target_agent", "reason"],
},
},
}
]
class RouterAgent(BaseAgent):
agent_type = AgentType.ROUTER
system_prompt = """You are a customer success router for a SaaS platform.
Your ONLY job is to understand the customer's intent and route them to the correct specialist:
- "troubleshooter": Technical issues, bugs, API errors, integration problems, feature questions
- "upseller": Upgrade inquiries, plan comparisons, feature requests for higher tiers, pricing
- "billing": Invoice questions, payment failures, refund requests, subscription changes
Analyze the customer's message carefully. If the intent is ambiguous, ask ONE clarifying question.
Always use the route_to_agent tool when you have identified the intent.
Customer profile is provided for context. Use it to inform routing decisions."""
async def process(self, context: ConversationContext) -> str | HandoffSignal:
# Inject customer profile into the conversation context
profile_msg = (
f"[Customer Profile] ID: {context.customer.customer_id}, "
f"Plan: {context.customer.plan}, MRR: ${context.customer.mrr}, "
f"Open Tickets: {context.customer.open_tickets}, "
f"Tags: {', '.join(context.customer.tags) or 'none'}"
)
augmented_context = context.model_copy()
augmented_context.messages.insert(
0,
__import__("models").Message(
role="system", content=profile_msg, agent=AgentType.ROUTER
),
)
response = await self.call_llm(augmented_context, tools=ROUTER_TOOLS)
# Check if the model wants to call the routing tool
if response.tool_calls:
tool_call = response.tool_calls[0]
args = json.loads(tool_call.function.arguments)
return HandoffSignal(
target_agent=AgentType(args["target_agent"]),
reason=args["reason"],
)
# If no tool call, the router is asking a clarifying question
return response.content
Expected Result: The Router Agent receives a customer message like “My API calls are returning 500 errors” and produces a
HandoffSignal(target_agent=AgentType.TROUBLESHOOTER, reason="Customer reporting API 500 errors"). For an ambiguous message like “I need help with my account,” it asks a clarifying question instead.
Step 4: Build Specialist Agents
Each specialist agent has a focused system prompt and domain-specific tools.
Technical Troubleshooter Agent
# agents/troubleshooter.py
import json
from agents.base import BaseAgent
from models import ConversationContext, HandoffSignal, AgentType
TROUBLESHOOTER_TOOLS = [
{
"type": "function",
"function": {
"name": "search_knowledge_base",
"description": "Search the technical documentation and known issues database.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query for the knowledge base.",
}
},
"required": ["query"],
},
},
},
{
"type": "function",
"function": {
"name": "check_service_status",
"description": "Check the current status of a specific service or API endpoint.",
"parameters": {
"type": "object",
"properties": {
"service_name": {
"type": "string",
"description": "Name of the service to check.",
}
},
"required": ["service_name"],
},
},
},
{
"type": "function",
"function": {
"name": "escalate_to_human",
"description": "Escalate the ticket to a human support engineer.",
"parameters": {
"type": "object",
"properties": {
"severity": {
"type": "string",
"enum": ["low", "medium", "high", "critical"],
},
"summary": {"type": "string"},
},
"required": ["severity", "summary"],
},
},
},
]
class TroubleshooterAgent(BaseAgent):
agent_type = AgentType.TROUBLESHOOTER
system_prompt = """You are a senior technical support engineer for a SaaS platform.
Your responsibilities:
1. Diagnose technical issues reported by customers
2. Search the knowledge base for known solutions
3. Check service status when outages are suspected
4. Provide clear, step-by-step resolution instructions
5. Escalate to human engineers when the issue is beyond automated resolution
Guidelines:
- Always check service status if the customer reports errors or downtime
- Search the knowledge base before providing custom troubleshooting steps
- If you cannot resolve the issue in 3 exchanges, escalate to a human
- Be empathetic but concise
- If the customer mentions pricing or upgrades, signal a handoff to the upseller agent"""
async def process(self, context: ConversationContext) -> str | HandoffSignal:
response = await self.call_llm(context, tools=TROUBLESHOOTER_TOOLS)
if response.tool_calls:
tool_call = response.tool_calls[0]
fn_name = tool_call.function.name
args = json.loads(tool_call.function.arguments)
match fn_name:
case "search_knowledge_base":
result = await self._search_kb(args["query"])
context.add_message("assistant", f"[Searching KB: {args['query']}]")
context.add_message("system", f"KB Result: {result}")
return await self.process(context) # Re-process with KB result
case "check_service_status":
result = await self._check_status(args["service_name"])
context.add_message("system", f"Service Status: {result}")
return await self.process(context)
case "escalate_to_human":
context.metadata["escalation"] = {
"severity": args["severity"],
"summary": args["summary"],
}
return f"I've escalated your issue to our engineering team with {args['severity']} priority. A human engineer will follow up within the next hour. Your reference number is #{context.conversation_id[:8].upper()}."
# Check if the response suggests a handoff
content = response.content or ""
if any(kw in content.lower() for kw in ["pricing", "upgrade", "plan change"]):
return HandoffSignal(
target_agent=AgentType.UPSELLER,
reason="Customer expressed interest in pricing or upgrades during troubleshooting.",
)
return content
async def _search_kb(self, query: str) -> str:
# In production, this calls your actual knowledge base API
# For now, return a simulated result
return f"Found 3 articles matching '{query}'. Top result: Check API rate limits and ensure authentication tokens are not expired."
async def _check_status(self, service_name: str) -> str:
# In production, integrate with your status page API
return f"Service '{service_name}' is operational. No incidents reported in the last 24 hours."
Sales Upseller Agent
# agents/upseller.py
import json
from agents.base import BaseAgent
from models import ConversationContext, HandoffSignal, AgentType
UPSELLER_TOOLS = [
{
"type": "function",
"function": {
"name": "get_plan_comparison",
"description": "Retrieve a comparison between the customer's current plan and a target plan.",
"parameters": {
"type": "object",
"properties": {
"current_plan": {"type": "string"},
"target_plan": {"type": "string"},
},
"required": ["current_plan", "target_plan"],
},
},
},
{
"type": "function",
"function": {
"name": "create_upgrade_offer",
"description": "Generate a personalized upgrade offer for the customer.",
"parameters": {
"type": "object",
"properties": {
"target_plan": {"type": "string"},
"discount_percent": {"type": "integer", "minimum": 0, "maximum": 30},
"trial_days": {"type": "integer", "minimum": 0, "maximum": 30},
},
"required": ["target_plan"],
},
},
},
]
class UpsellerAgent(BaseAgent):
agent_type = AgentType.UPSELLER
system_prompt = """You are a consultative sales specialist for a SaaS platform.
Your approach:
1. Understand the customer's needs and pain points FIRST
2. Map their needs to specific features in higher-tier plans
3. Present value before price
4. Offer trials or discounts when appropriate
5. Never be pushy -- if the customer is not interested, respect that
Available plans: Free, Starter ($29/mo), Professional ($99/mo), Enterprise ($299/mo)
Guidelines:
- Reference the customer's profile to personalize recommendations
- If the customer has technical issues, hand off to the troubleshooter
- Always frame upgrades in terms of business value, not features"""
async def process(self, context: ConversationContext) -> str | HandoffSignal:
response = await self.call_llm(context, tools=UPSELLER_TOOLS)
if response.tool_calls:
tool_call = response.tool_calls[0]
fn_name = tool_call.function.name
args = json.loads(tool_call.function.arguments)
match fn_name:
case "get_plan_comparison":
comparison = self._compare_plans(args["current_plan"], args["target_plan"])
context.add_message("system", f"Plan Comparison: {comparison}")
return await self.process(context)
case "create_upgrade_offer":
offer = self._build_offer(context.customer, args)
return offer
content = response.content or ""
if any(kw in content.lower() for kw in ["bug", "error", "broken", "not working"]):
return HandoffSignal(
target_agent=AgentType.TROUBLESHOOTER,
reason="Customer described a technical issue during sales conversation.",
)
return content
def _compare_plans(self, current: str, target: str) -> str:
return f"Upgrading from {current} to {target}: +10 API seats, +unlimited integrations, +priority support, +advanced analytics."
def _build_offer(self, customer, args: dict) -> str:
discount = args.get("discount_percent", 0)
trial = args.get("trial_days", 14)
plan = args["target_plan"]
msg = f"Great news! I've prepared a personalized offer for you:\n\n"
msg += f"**{plan.title()} Plan**\n"
if discount:
msg += f"- {discount}% discount for the first 3 months\n"
if trial:
msg += f"- {trial}-day free trial to explore all features\n"
msg += f"\nWould you like me to activate this offer on your account?"
return msg
Billing Support Agent
# agents/billing.py
import json
from agents.base import BaseAgent
from models import ConversationContext, HandoffSignal, AgentType
BILLING_TOOLS = [
{
"type": "function",
"function": {
"name": "lookup_invoice",
"description": "Look up invoice details for the customer.",
"parameters": {
"type": "object",
"properties": {
"invoice_id": {"type": "string"},
"months_back": {"type": "integer", "default": 3},
},
},
},
},
{
"type": "function",
"function": {
"name": "process_refund",
"description": "Initiate a refund for a specific invoice. Requires manager approval for amounts over $500.",
"parameters": {
"type": "object",
"properties": {
"invoice_id": {"type": "string"},
"amount": {"type": "number"},
"reason": {"type": "string"},
},
"required": ["invoice_id", "amount", "reason"],
},
},
},
]
class BillingAgent(BaseAgent):
agent_type = AgentType.BILLING
system_prompt = """You are a billing support specialist for a SaaS platform.
Your responsibilities:
1. Answer questions about invoices, charges, and payment methods
2. Process refund requests following company policy
3. Explain billing cycles and proration
4. Help customers update payment information (direct them to the settings page)
Refund policy:
- Refunds under $100: auto-approve
- Refunds $100-$500: approve with documentation
- Refunds over $500: require manager approval (escalate)
Guidelines:
- Always verify the invoice before discussing charges
- Be transparent about all fees
- If the customer asks about upgrading, hand off to the upseller"""
async def process(self, context: ConversationContext) -> str | HandoffSignal:
response = await self.call_llm(context, tools=BILLING_TOOLS)
if response.tool_calls:
tool_call = response.tool_calls[0]
fn_name = tool_call.function.name
args = json.loads(tool_call.function.arguments)
match fn_name:
case "lookup_invoice":
invoice_data = await self._lookup_invoice(args)
context.add_message("system", f"Invoice Data: {invoice_data}")
return await self.process(context)
case "process_refund":
result = await self._process_refund(args)
return result
content = response.content or ""
if any(kw in content.lower() for kw in ["upgrade", "higher plan", "more features"]):
return HandoffSignal(
target_agent=AgentType.UPSELLER,
reason="Customer expressed interest in upgrading during billing discussion.",
)
return content
async def _lookup_invoice(self, args: dict) -> str:
invoice_id = args.get("invoice_id", "latest")
return f"Invoice {invoice_id}: $99.00 charged on 2026-03-01 for Professional Plan. Status: Paid."
async def _process_refund(self, args: dict) -> str:
amount = args["amount"]
if amount > 500:
return f"Your refund request for ${amount:.2f} requires manager approval. I've submitted it for review and you'll receive an email confirmation within 24 hours."
return f"Your refund of ${amount:.2f} has been approved and will appear on your statement within 5-7 business days. Reason logged: {args['reason']}"
Expected Result: You now have four agents, each with focused responsibilities. The Troubleshooter can search a knowledge base and check service status. The Upseller can compare plans and generate offers. The Billing agent can look up invoices and process refunds. Each agent can signal a handoff when the conversation drifts outside its domain.
Step 5: Implement Agent Handoffs
The orchestrator ties everything together. It receives messages, dispatches them to the current agent, and processes handoff signals.
# orchestrator.py
import logging
from models import ConversationContext, HandoffSignal, AgentType, Message
from agents.router import RouterAgent
from agents.troubleshooter import TroubleshooterAgent
from agents.upseller import UpsellerAgent
from agents.billing import BillingAgent
from agents.base import BaseAgent
import config
logger = logging.getLogger(__name__)
class SwarmOrchestrator:
def __init__(self):
self.agents: dict[AgentType, BaseAgent] = {
AgentType.ROUTER: RouterAgent(),
AgentType.TROUBLESHOOTER: TroubleshooterAgent(),
AgentType.UPSELLER: UpsellerAgent(),
AgentType.BILLING: BillingAgent(),
}
self.conversations: dict[str, ConversationContext] = {}
def get_or_create_context(
self, conversation_id: str, customer_data: dict
) -> ConversationContext:
if conversation_id not in self.conversations:
from models import CustomerProfile
customer = CustomerProfile(**customer_data)
self.conversations[conversation_id] = ConversationContext(
conversation_id=conversation_id,
customer=customer,
)
return self.conversations[conversation_id]
async def handle_message(
self, conversation_id: str, customer_data: dict, user_message: str
) -> dict:
context = self.get_or_create_context(conversation_id, customer_data)
context.add_message("user", user_message)
handoff_count = 0
response_text = ""
while handoff_count < config.MAX_HANDOFFS_PER_TURN:
current_agent = self.agents[context.current_agent]
logger.info(
f"[{conversation_id}] Processing with {context.current_agent.value} agent"
)
result = await current_agent.process(context)
if isinstance(result, HandoffSignal):
logger.info(
f"[{conversation_id}] Handoff: {context.current_agent.value} -> "
f"{result.target_agent.value} (reason: {result.reason})"
)
context.handoff_history.append(result)
context.current_agent = result.target_agent
context.metadata.update(result.context_update)
handoff_count += 1
continue
# Agent returned a text response
response_text = result
context.add_message("assistant", response_text, agent=context.current_agent)
break
if handoff_count >= config.MAX_HANDOFFS_PER_TURN:
response_text = (
"I'm having trouble routing your request. "
"Let me connect you with a human agent who can help directly."
)
context.add_message("assistant", response_text)
return {
"conversation_id": conversation_id,
"response": response_text,
"agent": context.current_agent.value,
"handoffs": [h.model_dump() for h in context.handoff_history],
}
Expected Result: When you send a message like “I keep getting 500 errors on the /users endpoint,” the orchestrator first invokes the Router, which produces a handoff to the Troubleshooter. The orchestrator then invokes the Troubleshooter, which searches the knowledge base and returns a diagnostic response. All of this happens in a single call to
handle_message. The handoff history is recorded for analytics.
Step 6: Add Memory & Context
For the system to feel intelligent across multiple interactions, you need persistent memory that survives beyond a single request cycle.
Conversation History Across Handoffs
The ConversationContext model already carries the full message history. When a handoff occurs, the new agent receives every prior message, including messages generated by other agents. This means the Sales agent knows what the Troubleshooter already discussed, and vice versa.
To make this memory persistent across sessions (for example, if the customer comes back the next day), add a storage layer:
# storage.py
import json
from pathlib import Path
from models import ConversationContext
STORAGE_DIR = Path("./conversation_store")
STORAGE_DIR.mkdir(exist_ok=True)
async def save_context(context: ConversationContext) -> None:
file_path = STORAGE_DIR / f"{context.conversation_id}.json"
file_path.write_text(context.model_dump_json(indent=2))
async def load_context(conversation_id: str) -> ConversationContext | None:
file_path = STORAGE_DIR / f"{conversation_id}.json"
if file_path.exists():
data = json.loads(file_path.read_text())
return ConversationContext(**data)
return None
Customer Profile Context
Enrich the customer profile with live data from your SaaS backend before each interaction:
# enrichment.py
from models import CustomerProfile
async def enrich_customer_profile(customer_id: str) -> dict:
"""
Fetch live customer data from your SaaS backend.
Replace this with actual API calls to your CRM, billing system, etc.
"""
# Example: Pull from your database or internal API
return {
"customer_id": customer_id,
"name": "Jane Doe",
"plan": "professional",
"mrr": 99.0,
"open_tickets": 2,
"last_login": "2026-03-09T14:30:00Z",
"tags": ["high-value", "api-heavy-user"],
}
Expected Result: Conversations persist across sessions. If a customer contacts you today about a billing issue and returns tomorrow with a follow-up, the system recalls the full history. The customer profile is enriched in real time, so agents always have up-to-date context about the customer’s plan, usage, and account status.
Step 7: Integration with SaaS Backend
REST API Wrapper
Wrap the orchestrator in a FastAPI application so any service in your stack can send customer messages and receive agent responses:
# main.py
import uuid
import logging
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from orchestrator import SwarmOrchestrator
from enrichment import enrich_customer_profile
from storage import save_context
logging.basicConfig(level=logging.INFO)
app = FastAPI(title="Swarm Customer Success API", version="1.0.0")
orchestrator = SwarmOrchestrator()
class MessageRequest(BaseModel):
customer_id: str
message: str
conversation_id: str | None = None
class MessageResponse(BaseModel):
conversation_id: str
response: str
agent: str
handoffs: list[dict]
@app.post("/api/v1/message", response_model=MessageResponse)
async def send_message(req: MessageRequest):
conversation_id = req.conversation_id or str(uuid.uuid4())
try:
customer_data = await enrich_customer_profile(req.customer_id)
except Exception:
raise HTTPException(status_code=404, detail="Customer not found")
result = await orchestrator.handle_message(
conversation_id=conversation_id,
customer_data=customer_data,
user_message=req.message,
)
# Persist conversation state
context = orchestrator.conversations.get(conversation_id)
if context:
await save_context(context)
return MessageResponse(**result)
@app.get("/api/v1/conversation/{conversation_id}")
async def get_conversation(conversation_id: str):
context = orchestrator.conversations.get(conversation_id)
if not context:
raise HTTPException(status_code=404, detail="Conversation not found")
return {
"conversation_id": context.conversation_id,
"current_agent": context.current_agent.value,
"message_count": len(context.messages),
"handoff_count": len(context.handoff_history),
}
Webhook Integration
For real-time notifications (for example, notifying Slack when a ticket is escalated), add a webhook dispatcher:
# webhooks.py
import httpx
import logging
from models import ConversationContext
logger = logging.getLogger(__name__)
WEBHOOK_ENDPOINTS = {
"escalation": "https://your-app.com/webhooks/escalation",
"upsell_opportunity": "https://your-app.com/webhooks/upsell",
"conversation_complete": "https://your-app.com/webhooks/complete",
}
async def dispatch_webhook(event_type: str, context: ConversationContext, payload: dict):
url = WEBHOOK_ENDPOINTS.get(event_type)
if not url:
logger.warning(f"No webhook configured for event: {event_type}")
return
webhook_data = {
"event": event_type,
"conversation_id": context.conversation_id,
"customer_id": context.customer.customer_id,
"current_agent": context.current_agent.value,
"payload": payload,
}
async with httpx.AsyncClient() as client:
try:
resp = await client.post(url, json=webhook_data, timeout=10.0)
resp.raise_for_status()
logger.info(f"Webhook dispatched: {event_type} -> {resp.status_code}")
except httpx.HTTPError as e:
logger.error(f"Webhook failed for {event_type}: {e}")
Start the server and test it:
uvicorn main:app --reload --port 8000
# Test with curl:
curl -X POST http://localhost:8000/api/v1/message \
-H "Content-Type: application/json" \
-d '{
"customer_id": "cust_12345",
"message": "My API calls keep returning 500 errors since this morning"
}'
Expected Result: The API returns a JSON response containing the agent’s reply, which agent handled the request, and the full handoff chain. The conversation is persisted to disk. You can send follow-up messages using the same
conversation_idand the system will maintain full context.
Testing the System
Build a test script that exercises the full handoff chain:
# test_swarm.py
import asyncio
from orchestrator import SwarmOrchestrator
CUSTOMER = {
"customer_id": "cust_test_001",
"name": "Test User",
"plan": "starter",
"mrr": 29.0,
"open_tickets": 1,
"tags": ["trial-convert"],
}
SCENARIOS = [
{
"name": "Technical Issue -> Troubleshooter",
"messages": [
"My webhook endpoints stopped receiving events about 2 hours ago.",
"I checked and my endpoint URL is correct. The logs show nothing incoming.",
],
},
{
"name": "Upgrade Inquiry -> Upseller",
"messages": [
"I'm interested in getting more API seats for my team.",
"What's the difference between Professional and Enterprise?",
],
},
{
"name": "Billing -> Sales Handoff",
"messages": [
"I have a question about my last invoice.",
"Actually, while I'm here, can I also ask about upgrading my plan?",
],
},
]
async def run_tests():
orch = SwarmOrchestrator()
for scenario in SCENARIOS:
print(f"\n{'='*60}")
print(f"SCENARIO: {scenario['name']}")
print(f"{'='*60}")
conv_id = f"test_{scenario['name'].replace(' ', '_').lower()}"
for msg in scenario["messages"]:
print(f"\nCustomer: {msg}")
result = await orch.handle_message(conv_id, CUSTOMER, msg)
print(f"Agent ({result['agent']}): {result['response']}")
if result["handoffs"]:
print(f" Handoffs: {[h['target_agent'] for h in result['handoffs']]}")
if __name__ == "__main__":
asyncio.run(run_tests())
Run with python test_swarm.py and verify that:
- Technical issues get routed to the Troubleshooter
- Upgrade questions reach the Upseller
- Billing questions that evolve into upgrade interest trigger a handoff from Billing to Upseller
- No infinite handoff loops occur
FAQ
Q: How does this differ from using a single agent with multiple tools? A single agent with many tools suffers from “tool confusion” as the tool count grows. With the Swarm pattern, each agent has a small, focused tool set (typically 2-4 tools), which dramatically improves accuracy. The LLM performs better when its instructions and capabilities are narrow and well-defined.
Q: What happens if two agents keep handing off to each other?
The orchestrator enforces a MAX_HANDOFFS_PER_TURN limit (defaulting to 5). If this limit is reached, it breaks the loop and escalates to a human agent. In practice, well-designed agent boundaries prevent this from happening.
Q: Can I use a cheaper model for the Router and a more capable model for specialists?
Absolutely. This is a recommended optimization. The Router performs a simple classification task and works well with gpt-4o-mini, while specialist agents that need to reason about complex technical issues benefit from gpt-4o. Modify the call_llm method in each agent’s base class to accept a model override.
Q: How do I add a new specialist agent?
Create a new class that extends BaseAgent, define its agent_type, system_prompt, and process method, then register it in the SwarmOrchestrator.__init__ agents dictionary and add the new AgentType to the RouterAgent tool definition.
Q: Is this production-ready? The architecture is production-grade, but you will want to add: rate limiting on the API endpoints, proper database storage instead of JSON files, authentication and authorization, observability (structured logging, distributed tracing), and retry logic with exponential backoff on OpenAI API calls.
Next Steps
Now that you have a working multi-agent customer success system, consider these enhancements:
- Add observability. Integrate structured logging with tools like Datadog or OpenTelemetry to track handoff patterns, agent response times, and resolution rates across your swarm.
- Implement evaluation. Build a test suite that measures routing accuracy, resolution quality, and customer satisfaction scores against a labeled dataset of historical support tickets.
- Connect real data sources. Replace the stubbed knowledge base and invoice lookups with actual integrations to your documentation platform, billing system, and CRM.
- Add streaming. Modify the FastAPI endpoint to use Server-Sent Events so customers see agent responses as they are generated rather than waiting for the full completion.
- Deploy with containers. Package each agent as an independent service behind a message queue (such as Redis Streams or RabbitMQ) for horizontal scaling and fault isolation.
- Build an analytics dashboard. Use the handoff history data to visualize common customer journeys, identify bottlenecks, and discover which agent transitions indicate product problems worth fixing.
Related Manuals
AI Agentic Workflow for B2B Lead Generation on LinkedIn
A deep dive into building an autonomous AI agent to identify, research, and engage high-value B2B prospects on LinkedIn using Claude and n8n.
Self-Improving Content Agents: Automating the Viral Content Loop on n8n
How to build an AI agent that doesn't just post content, but analyzes real-time engagement data to optimize its next viral hook autonomously.
[Must Read] Stop Just Chatting! 5 AI Agent Patterns to Multiplied Your Side-Hustle Income
Simple prompts are not enough. Mastering these 5 patterns will turn AI into your most powerful earning partner in the side-hustle economy.