ai-agents Difficulty: Advanced

Harnessing OpenAI Swarm for Complex SaaS Automations

Is Multi-Agent orchestration the next frontier? Learn how to implement OpenAI Swarm patterns within your SaaS backend to handle non-linear customer success and sales workflows.

What You’ll Build

In this guide, you will build a multi-agent customer success system inspired by the OpenAI Swarm pattern. By the end, you will have:

  • A Router Agent that analyzes incoming customer requests and intelligently dispatches them to the right specialist
  • A Technical Troubleshooter Agent that searches documentation and logs to resolve issues
  • A Sales Upseller Agent that identifies upgrade opportunities during support interactions
  • A Billing Support Agent that handles invoice and payment questions
  • A handoff mechanism that allows agents to transfer conversations seamlessly, preserving full context
  • A REST API wrapper so you can plug the entire system into your existing SaaS backend

The result is a customer success pipeline that feels proactive rather than reactive, handling non-linear workflows that would be impossible with traditional if-this-then-that automation.

Prerequisites

RequirementDetails
PythonVersion 3.10 or higher (we use match statements and modern async features)
OpenAI API KeyAn active key with access to gpt-4o or gpt-4o-mini
Async PythonBasic familiarity with asyncio, async/await, and type hints
Package Managerpip or uv for dependency management
OptionalA running SaaS backend with REST endpoints for live integration

Architecture Overview

The system follows a hub-and-spoke architecture. The Router Agent sits at the center, receiving every inbound customer message. It classifies the intent and hands the conversation off to the appropriate specialist agent. Each specialist can either resolve the request or hand it back to the router for re-classification.

Customer Message


┌──────────────┐
│ Router Agent │  ← Classifies intent
└──────┬───────┘

  ┌────┼─────────────┐
  ▼    ▼              ▼
┌────┐ ┌──────┐  ┌─────────┐
│Tech│ │Sales │  │ Billing │
│    │ │Upsell│  │ Support │
└────┘ └──────┘  └─────────┘

Every agent shares a common ConversationContext object. When a handoff occurs, the full history travels with it, so the receiving agent has complete awareness of what has already been discussed.

Step 1: Understanding the Swarm Pattern

Traditional chatbot architectures use a single monolithic prompt that tries to handle every scenario. This approach breaks down quickly in SaaS environments where customer interactions span billing, technical support, account management, and sales in a single conversation.

The Swarm pattern solves this by decomposing the problem into specialized agents, each with a focused system prompt, a targeted tool set, and clear boundaries for what it can and cannot handle.

Multi-Agent Orchestration Concepts

There are three core principles:

  1. Single Responsibility — Each agent does one thing well. The troubleshooter never discusses pricing; the upseller never debugs API errors.
  2. Explicit Handoff — Agents do not silently redirect. They produce a structured handoff object that the orchestrator interprets, ensuring auditability.
  3. Shared Context — All agents read from and write to the same conversation history, so the customer never has to repeat themselves.

Agent Handoff Mechanism

A handoff is a structured signal from one agent to the orchestrator. It contains:

  • target_agent: which specialist should take over
  • reason: why the handoff is happening (used for logging and analytics)
  • context_update: any new information the current agent discovered that the next agent needs

The orchestrator receives this signal, updates the shared context, and invokes the target agent on the next turn.

Key Concept: The Swarm pattern is not about agents talking to each other directly. It is about agents communicating through a central orchestrator that manages state, enforces routing rules, and maintains a complete audit trail.

Step 2: Set Up the Project

Start by creating the project structure and installing dependencies.

# Install dependencies
# pip install openai pydantic fastapi uvicorn

# Project structure:
# swarm_saas/
# ├── main.py              # FastAPI entry point
# ├── orchestrator.py      # Central orchestration engine
# ├── agents/
# │   ├── __init__.py
# │   ├── base.py          # Base agent class
# │   ├── router.py        # Router agent
# │   ├── troubleshooter.py
# │   ├── upseller.py
# │   └── billing.py
# ├── models.py            # Pydantic data models
# └── config.py            # Configuration and prompts

Define the shared data models that every agent will use:

# models.py
from pydantic import BaseModel, Field
from enum import Enum
from datetime import datetime


class AgentType(str, Enum):
    ROUTER = "router"
    TROUBLESHOOTER = "troubleshooter"
    UPSELLER = "upseller"
    BILLING = "billing"


class Message(BaseModel):
    role: str
    content: str
    agent: AgentType | None = None
    timestamp: datetime = Field(default_factory=datetime.utcnow)


class CustomerProfile(BaseModel):
    customer_id: str
    name: str
    plan: str = "free"
    mrr: float = 0.0
    open_tickets: int = 0
    last_login: datetime | None = None
    tags: list[str] = Field(default_factory=list)


class HandoffSignal(BaseModel):
    target_agent: AgentType
    reason: str
    context_update: dict = Field(default_factory=dict)


class ConversationContext(BaseModel):
    conversation_id: str
    customer: CustomerProfile
    messages: list[Message] = Field(default_factory=list)
    current_agent: AgentType = AgentType.ROUTER
    metadata: dict = Field(default_factory=dict)
    handoff_history: list[HandoffSignal] = Field(default_factory=list)

    def add_message(self, role: str, content: str, agent: AgentType | None = None):
        self.messages.append(Message(role=role, content=content, agent=agent))

    def get_history_for_prompt(self) -> list[dict]:
        return [{"role": m.role, "content": m.content} for m in self.messages]

Set up the configuration file:

# config.py
import os

OPENAI_API_KEY = os.environ["OPENAI_API_KEY"]
MODEL = "gpt-4o"
TEMPERATURE = 0.3
MAX_HANDOFFS_PER_TURN = 5  # Safety limit to prevent infinite loops

Expected Result: You should have a clean project directory with all data models defined. Run python -c "from models import ConversationContext; print('Models OK')" to verify everything imports correctly.

Step 3: Build the Router Agent

The Router Agent is the entry point for every customer message. Its job is to classify intent and produce a handoff signal.

# agents/base.py
from abc import ABC, abstractmethod
from openai import AsyncOpenAI
from models import ConversationContext, HandoffSignal, AgentType
import config
import json


client = AsyncOpenAI(api_key=config.OPENAI_API_KEY)


class BaseAgent(ABC):
    agent_type: AgentType
    system_prompt: str

    @abstractmethod
    async def process(self, context: ConversationContext) -> str | HandoffSignal:
        """Process a message and return a response or a handoff signal."""
        ...

    async def call_llm(
        self,
        context: ConversationContext,
        tools: list[dict] | None = None,
    ) -> dict:
        messages = [
            {"role": "system", "content": self.system_prompt},
            *context.get_history_for_prompt(),
        ]

        kwargs = {
            "model": config.MODEL,
            "messages": messages,
            "temperature": config.TEMPERATURE,
        }
        if tools:
            kwargs["tools"] = tools
            kwargs["tool_choice"] = "auto"

        response = await client.chat.completions.create(**kwargs)
        return response.choices[0].message

Now build the Router itself:

# agents/router.py
import json
from agents.base import BaseAgent
from models import ConversationContext, HandoffSignal, AgentType


ROUTER_TOOLS = [
    {
        "type": "function",
        "function": {
            "name": "route_to_agent",
            "description": "Route the customer to a specialist agent based on their intent.",
            "parameters": {
                "type": "object",
                "properties": {
                    "target_agent": {
                        "type": "string",
                        "enum": ["troubleshooter", "upseller", "billing"],
                        "description": "The specialist agent to hand off to.",
                    },
                    "reason": {
                        "type": "string",
                        "description": "Brief explanation of why this routing was chosen.",
                    },
                },
                "required": ["target_agent", "reason"],
            },
        },
    }
]


class RouterAgent(BaseAgent):
    agent_type = AgentType.ROUTER
    system_prompt = """You are a customer success router for a SaaS platform.

Your ONLY job is to understand the customer's intent and route them to the correct specialist:

- "troubleshooter": Technical issues, bugs, API errors, integration problems, feature questions
- "upseller": Upgrade inquiries, plan comparisons, feature requests for higher tiers, pricing
- "billing": Invoice questions, payment failures, refund requests, subscription changes

Analyze the customer's message carefully. If the intent is ambiguous, ask ONE clarifying question.
Always use the route_to_agent tool when you have identified the intent.

Customer profile is provided for context. Use it to inform routing decisions."""

    async def process(self, context: ConversationContext) -> str | HandoffSignal:
        # Inject customer profile into the conversation context
        profile_msg = (
            f"[Customer Profile] ID: {context.customer.customer_id}, "
            f"Plan: {context.customer.plan}, MRR: ${context.customer.mrr}, "
            f"Open Tickets: {context.customer.open_tickets}, "
            f"Tags: {', '.join(context.customer.tags) or 'none'}"
        )
        augmented_context = context.model_copy()
        augmented_context.messages.insert(
            0,
            __import__("models").Message(
                role="system", content=profile_msg, agent=AgentType.ROUTER
            ),
        )

        response = await self.call_llm(augmented_context, tools=ROUTER_TOOLS)

        # Check if the model wants to call the routing tool
        if response.tool_calls:
            tool_call = response.tool_calls[0]
            args = json.loads(tool_call.function.arguments)
            return HandoffSignal(
                target_agent=AgentType(args["target_agent"]),
                reason=args["reason"],
            )

        # If no tool call, the router is asking a clarifying question
        return response.content

Expected Result: The Router Agent receives a customer message like “My API calls are returning 500 errors” and produces a HandoffSignal(target_agent=AgentType.TROUBLESHOOTER, reason="Customer reporting API 500 errors"). For an ambiguous message like “I need help with my account,” it asks a clarifying question instead.

Step 4: Build Specialist Agents

Each specialist agent has a focused system prompt and domain-specific tools.

Technical Troubleshooter Agent

# agents/troubleshooter.py
import json
from agents.base import BaseAgent
from models import ConversationContext, HandoffSignal, AgentType

TROUBLESHOOTER_TOOLS = [
    {
        "type": "function",
        "function": {
            "name": "search_knowledge_base",
            "description": "Search the technical documentation and known issues database.",
            "parameters": {
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": "Search query for the knowledge base.",
                    }
                },
                "required": ["query"],
            },
        },
    },
    {
        "type": "function",
        "function": {
            "name": "check_service_status",
            "description": "Check the current status of a specific service or API endpoint.",
            "parameters": {
                "type": "object",
                "properties": {
                    "service_name": {
                        "type": "string",
                        "description": "Name of the service to check.",
                    }
                },
                "required": ["service_name"],
            },
        },
    },
    {
        "type": "function",
        "function": {
            "name": "escalate_to_human",
            "description": "Escalate the ticket to a human support engineer.",
            "parameters": {
                "type": "object",
                "properties": {
                    "severity": {
                        "type": "string",
                        "enum": ["low", "medium", "high", "critical"],
                    },
                    "summary": {"type": "string"},
                },
                "required": ["severity", "summary"],
            },
        },
    },
]


class TroubleshooterAgent(BaseAgent):
    agent_type = AgentType.TROUBLESHOOTER
    system_prompt = """You are a senior technical support engineer for a SaaS platform.

Your responsibilities:
1. Diagnose technical issues reported by customers
2. Search the knowledge base for known solutions
3. Check service status when outages are suspected
4. Provide clear, step-by-step resolution instructions
5. Escalate to human engineers when the issue is beyond automated resolution

Guidelines:
- Always check service status if the customer reports errors or downtime
- Search the knowledge base before providing custom troubleshooting steps
- If you cannot resolve the issue in 3 exchanges, escalate to a human
- Be empathetic but concise
- If the customer mentions pricing or upgrades, signal a handoff to the upseller agent"""

    async def process(self, context: ConversationContext) -> str | HandoffSignal:
        response = await self.call_llm(context, tools=TROUBLESHOOTER_TOOLS)

        if response.tool_calls:
            tool_call = response.tool_calls[0]
            fn_name = tool_call.function.name
            args = json.loads(tool_call.function.arguments)

            match fn_name:
                case "search_knowledge_base":
                    result = await self._search_kb(args["query"])
                    context.add_message("assistant", f"[Searching KB: {args['query']}]")
                    context.add_message("system", f"KB Result: {result}")
                    return await self.process(context)  # Re-process with KB result

                case "check_service_status":
                    result = await self._check_status(args["service_name"])
                    context.add_message("system", f"Service Status: {result}")
                    return await self.process(context)

                case "escalate_to_human":
                    context.metadata["escalation"] = {
                        "severity": args["severity"],
                        "summary": args["summary"],
                    }
                    return f"I've escalated your issue to our engineering team with {args['severity']} priority. A human engineer will follow up within the next hour. Your reference number is #{context.conversation_id[:8].upper()}."

        # Check if the response suggests a handoff
        content = response.content or ""
        if any(kw in content.lower() for kw in ["pricing", "upgrade", "plan change"]):
            return HandoffSignal(
                target_agent=AgentType.UPSELLER,
                reason="Customer expressed interest in pricing or upgrades during troubleshooting.",
            )

        return content

    async def _search_kb(self, query: str) -> str:
        # In production, this calls your actual knowledge base API
        # For now, return a simulated result
        return f"Found 3 articles matching '{query}'. Top result: Check API rate limits and ensure authentication tokens are not expired."

    async def _check_status(self, service_name: str) -> str:
        # In production, integrate with your status page API
        return f"Service '{service_name}' is operational. No incidents reported in the last 24 hours."

Sales Upseller Agent

# agents/upseller.py
import json
from agents.base import BaseAgent
from models import ConversationContext, HandoffSignal, AgentType


UPSELLER_TOOLS = [
    {
        "type": "function",
        "function": {
            "name": "get_plan_comparison",
            "description": "Retrieve a comparison between the customer's current plan and a target plan.",
            "parameters": {
                "type": "object",
                "properties": {
                    "current_plan": {"type": "string"},
                    "target_plan": {"type": "string"},
                },
                "required": ["current_plan", "target_plan"],
            },
        },
    },
    {
        "type": "function",
        "function": {
            "name": "create_upgrade_offer",
            "description": "Generate a personalized upgrade offer for the customer.",
            "parameters": {
                "type": "object",
                "properties": {
                    "target_plan": {"type": "string"},
                    "discount_percent": {"type": "integer", "minimum": 0, "maximum": 30},
                    "trial_days": {"type": "integer", "minimum": 0, "maximum": 30},
                },
                "required": ["target_plan"],
            },
        },
    },
]


class UpsellerAgent(BaseAgent):
    agent_type = AgentType.UPSELLER
    system_prompt = """You are a consultative sales specialist for a SaaS platform.

Your approach:
1. Understand the customer's needs and pain points FIRST
2. Map their needs to specific features in higher-tier plans
3. Present value before price
4. Offer trials or discounts when appropriate
5. Never be pushy -- if the customer is not interested, respect that

Available plans: Free, Starter ($29/mo), Professional ($99/mo), Enterprise ($299/mo)

Guidelines:
- Reference the customer's profile to personalize recommendations
- If the customer has technical issues, hand off to the troubleshooter
- Always frame upgrades in terms of business value, not features"""

    async def process(self, context: ConversationContext) -> str | HandoffSignal:
        response = await self.call_llm(context, tools=UPSELLER_TOOLS)

        if response.tool_calls:
            tool_call = response.tool_calls[0]
            fn_name = tool_call.function.name
            args = json.loads(tool_call.function.arguments)

            match fn_name:
                case "get_plan_comparison":
                    comparison = self._compare_plans(args["current_plan"], args["target_plan"])
                    context.add_message("system", f"Plan Comparison: {comparison}")
                    return await self.process(context)

                case "create_upgrade_offer":
                    offer = self._build_offer(context.customer, args)
                    return offer

        content = response.content or ""
        if any(kw in content.lower() for kw in ["bug", "error", "broken", "not working"]):
            return HandoffSignal(
                target_agent=AgentType.TROUBLESHOOTER,
                reason="Customer described a technical issue during sales conversation.",
            )

        return content

    def _compare_plans(self, current: str, target: str) -> str:
        return f"Upgrading from {current} to {target}: +10 API seats, +unlimited integrations, +priority support, +advanced analytics."

    def _build_offer(self, customer, args: dict) -> str:
        discount = args.get("discount_percent", 0)
        trial = args.get("trial_days", 14)
        plan = args["target_plan"]
        msg = f"Great news! I've prepared a personalized offer for you:\n\n"
        msg += f"**{plan.title()} Plan**\n"
        if discount:
            msg += f"- {discount}% discount for the first 3 months\n"
        if trial:
            msg += f"- {trial}-day free trial to explore all features\n"
        msg += f"\nWould you like me to activate this offer on your account?"
        return msg

Billing Support Agent

# agents/billing.py
import json
from agents.base import BaseAgent
from models import ConversationContext, HandoffSignal, AgentType

BILLING_TOOLS = [
    {
        "type": "function",
        "function": {
            "name": "lookup_invoice",
            "description": "Look up invoice details for the customer.",
            "parameters": {
                "type": "object",
                "properties": {
                    "invoice_id": {"type": "string"},
                    "months_back": {"type": "integer", "default": 3},
                },
            },
        },
    },
    {
        "type": "function",
        "function": {
            "name": "process_refund",
            "description": "Initiate a refund for a specific invoice. Requires manager approval for amounts over $500.",
            "parameters": {
                "type": "object",
                "properties": {
                    "invoice_id": {"type": "string"},
                    "amount": {"type": "number"},
                    "reason": {"type": "string"},
                },
                "required": ["invoice_id", "amount", "reason"],
            },
        },
    },
]


class BillingAgent(BaseAgent):
    agent_type = AgentType.BILLING
    system_prompt = """You are a billing support specialist for a SaaS platform.

Your responsibilities:
1. Answer questions about invoices, charges, and payment methods
2. Process refund requests following company policy
3. Explain billing cycles and proration
4. Help customers update payment information (direct them to the settings page)

Refund policy:
- Refunds under $100: auto-approve
- Refunds $100-$500: approve with documentation
- Refunds over $500: require manager approval (escalate)

Guidelines:
- Always verify the invoice before discussing charges
- Be transparent about all fees
- If the customer asks about upgrading, hand off to the upseller"""

    async def process(self, context: ConversationContext) -> str | HandoffSignal:
        response = await self.call_llm(context, tools=BILLING_TOOLS)

        if response.tool_calls:
            tool_call = response.tool_calls[0]
            fn_name = tool_call.function.name
            args = json.loads(tool_call.function.arguments)

            match fn_name:
                case "lookup_invoice":
                    invoice_data = await self._lookup_invoice(args)
                    context.add_message("system", f"Invoice Data: {invoice_data}")
                    return await self.process(context)

                case "process_refund":
                    result = await self._process_refund(args)
                    return result

        content = response.content or ""
        if any(kw in content.lower() for kw in ["upgrade", "higher plan", "more features"]):
            return HandoffSignal(
                target_agent=AgentType.UPSELLER,
                reason="Customer expressed interest in upgrading during billing discussion.",
            )

        return content

    async def _lookup_invoice(self, args: dict) -> str:
        invoice_id = args.get("invoice_id", "latest")
        return f"Invoice {invoice_id}: $99.00 charged on 2026-03-01 for Professional Plan. Status: Paid."

    async def _process_refund(self, args: dict) -> str:
        amount = args["amount"]
        if amount > 500:
            return f"Your refund request for ${amount:.2f} requires manager approval. I've submitted it for review and you'll receive an email confirmation within 24 hours."
        return f"Your refund of ${amount:.2f} has been approved and will appear on your statement within 5-7 business days. Reason logged: {args['reason']}"

Expected Result: You now have four agents, each with focused responsibilities. The Troubleshooter can search a knowledge base and check service status. The Upseller can compare plans and generate offers. The Billing agent can look up invoices and process refunds. Each agent can signal a handoff when the conversation drifts outside its domain.

Step 5: Implement Agent Handoffs

The orchestrator ties everything together. It receives messages, dispatches them to the current agent, and processes handoff signals.

# orchestrator.py
import logging
from models import ConversationContext, HandoffSignal, AgentType, Message
from agents.router import RouterAgent
from agents.troubleshooter import TroubleshooterAgent
from agents.upseller import UpsellerAgent
from agents.billing import BillingAgent
from agents.base import BaseAgent
import config

logger = logging.getLogger(__name__)


class SwarmOrchestrator:
    def __init__(self):
        self.agents: dict[AgentType, BaseAgent] = {
            AgentType.ROUTER: RouterAgent(),
            AgentType.TROUBLESHOOTER: TroubleshooterAgent(),
            AgentType.UPSELLER: UpsellerAgent(),
            AgentType.BILLING: BillingAgent(),
        }
        self.conversations: dict[str, ConversationContext] = {}

    def get_or_create_context(
        self, conversation_id: str, customer_data: dict
    ) -> ConversationContext:
        if conversation_id not in self.conversations:
            from models import CustomerProfile

            customer = CustomerProfile(**customer_data)
            self.conversations[conversation_id] = ConversationContext(
                conversation_id=conversation_id,
                customer=customer,
            )
        return self.conversations[conversation_id]

    async def handle_message(
        self, conversation_id: str, customer_data: dict, user_message: str
    ) -> dict:
        context = self.get_or_create_context(conversation_id, customer_data)
        context.add_message("user", user_message)

        handoff_count = 0
        response_text = ""

        while handoff_count < config.MAX_HANDOFFS_PER_TURN:
            current_agent = self.agents[context.current_agent]

            logger.info(
                f"[{conversation_id}] Processing with {context.current_agent.value} agent"
            )

            result = await current_agent.process(context)

            if isinstance(result, HandoffSignal):
                logger.info(
                    f"[{conversation_id}] Handoff: {context.current_agent.value} -> "
                    f"{result.target_agent.value} (reason: {result.reason})"
                )

                context.handoff_history.append(result)
                context.current_agent = result.target_agent
                context.metadata.update(result.context_update)
                handoff_count += 1
                continue

            # Agent returned a text response
            response_text = result
            context.add_message("assistant", response_text, agent=context.current_agent)
            break

        if handoff_count >= config.MAX_HANDOFFS_PER_TURN:
            response_text = (
                "I'm having trouble routing your request. "
                "Let me connect you with a human agent who can help directly."
            )
            context.add_message("assistant", response_text)

        return {
            "conversation_id": conversation_id,
            "response": response_text,
            "agent": context.current_agent.value,
            "handoffs": [h.model_dump() for h in context.handoff_history],
        }

Expected Result: When you send a message like “I keep getting 500 errors on the /users endpoint,” the orchestrator first invokes the Router, which produces a handoff to the Troubleshooter. The orchestrator then invokes the Troubleshooter, which searches the knowledge base and returns a diagnostic response. All of this happens in a single call to handle_message. The handoff history is recorded for analytics.

Step 6: Add Memory & Context

For the system to feel intelligent across multiple interactions, you need persistent memory that survives beyond a single request cycle.

Conversation History Across Handoffs

The ConversationContext model already carries the full message history. When a handoff occurs, the new agent receives every prior message, including messages generated by other agents. This means the Sales agent knows what the Troubleshooter already discussed, and vice versa.

To make this memory persistent across sessions (for example, if the customer comes back the next day), add a storage layer:

# storage.py
import json
from pathlib import Path
from models import ConversationContext

STORAGE_DIR = Path("./conversation_store")
STORAGE_DIR.mkdir(exist_ok=True)


async def save_context(context: ConversationContext) -> None:
    file_path = STORAGE_DIR / f"{context.conversation_id}.json"
    file_path.write_text(context.model_dump_json(indent=2))


async def load_context(conversation_id: str) -> ConversationContext | None:
    file_path = STORAGE_DIR / f"{conversation_id}.json"
    if file_path.exists():
        data = json.loads(file_path.read_text())
        return ConversationContext(**data)
    return None

Customer Profile Context

Enrich the customer profile with live data from your SaaS backend before each interaction:

# enrichment.py
from models import CustomerProfile


async def enrich_customer_profile(customer_id: str) -> dict:
    """
    Fetch live customer data from your SaaS backend.
    Replace this with actual API calls to your CRM, billing system, etc.
    """
    # Example: Pull from your database or internal API
    return {
        "customer_id": customer_id,
        "name": "Jane Doe",
        "plan": "professional",
        "mrr": 99.0,
        "open_tickets": 2,
        "last_login": "2026-03-09T14:30:00Z",
        "tags": ["high-value", "api-heavy-user"],
    }

Expected Result: Conversations persist across sessions. If a customer contacts you today about a billing issue and returns tomorrow with a follow-up, the system recalls the full history. The customer profile is enriched in real time, so agents always have up-to-date context about the customer’s plan, usage, and account status.

Step 7: Integration with SaaS Backend

REST API Wrapper

Wrap the orchestrator in a FastAPI application so any service in your stack can send customer messages and receive agent responses:

# main.py
import uuid
import logging
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from orchestrator import SwarmOrchestrator
from enrichment import enrich_customer_profile
from storage import save_context

logging.basicConfig(level=logging.INFO)

app = FastAPI(title="Swarm Customer Success API", version="1.0.0")
orchestrator = SwarmOrchestrator()


class MessageRequest(BaseModel):
    customer_id: str
    message: str
    conversation_id: str | None = None


class MessageResponse(BaseModel):
    conversation_id: str
    response: str
    agent: str
    handoffs: list[dict]


@app.post("/api/v1/message", response_model=MessageResponse)
async def send_message(req: MessageRequest):
    conversation_id = req.conversation_id or str(uuid.uuid4())

    try:
        customer_data = await enrich_customer_profile(req.customer_id)
    except Exception:
        raise HTTPException(status_code=404, detail="Customer not found")

    result = await orchestrator.handle_message(
        conversation_id=conversation_id,
        customer_data=customer_data,
        user_message=req.message,
    )

    # Persist conversation state
    context = orchestrator.conversations.get(conversation_id)
    if context:
        await save_context(context)

    return MessageResponse(**result)


@app.get("/api/v1/conversation/{conversation_id}")
async def get_conversation(conversation_id: str):
    context = orchestrator.conversations.get(conversation_id)
    if not context:
        raise HTTPException(status_code=404, detail="Conversation not found")
    return {
        "conversation_id": context.conversation_id,
        "current_agent": context.current_agent.value,
        "message_count": len(context.messages),
        "handoff_count": len(context.handoff_history),
    }

Webhook Integration

For real-time notifications (for example, notifying Slack when a ticket is escalated), add a webhook dispatcher:

# webhooks.py
import httpx
import logging
from models import ConversationContext

logger = logging.getLogger(__name__)

WEBHOOK_ENDPOINTS = {
    "escalation": "https://your-app.com/webhooks/escalation",
    "upsell_opportunity": "https://your-app.com/webhooks/upsell",
    "conversation_complete": "https://your-app.com/webhooks/complete",
}


async def dispatch_webhook(event_type: str, context: ConversationContext, payload: dict):
    url = WEBHOOK_ENDPOINTS.get(event_type)
    if not url:
        logger.warning(f"No webhook configured for event: {event_type}")
        return

    webhook_data = {
        "event": event_type,
        "conversation_id": context.conversation_id,
        "customer_id": context.customer.customer_id,
        "current_agent": context.current_agent.value,
        "payload": payload,
    }

    async with httpx.AsyncClient() as client:
        try:
            resp = await client.post(url, json=webhook_data, timeout=10.0)
            resp.raise_for_status()
            logger.info(f"Webhook dispatched: {event_type} -> {resp.status_code}")
        except httpx.HTTPError as e:
            logger.error(f"Webhook failed for {event_type}: {e}")

Start the server and test it:

uvicorn main:app --reload --port 8000

# Test with curl:
curl -X POST http://localhost:8000/api/v1/message \
  -H "Content-Type: application/json" \
  -d '{
    "customer_id": "cust_12345",
    "message": "My API calls keep returning 500 errors since this morning"
  }'

Expected Result: The API returns a JSON response containing the agent’s reply, which agent handled the request, and the full handoff chain. The conversation is persisted to disk. You can send follow-up messages using the same conversation_id and the system will maintain full context.

Testing the System

Build a test script that exercises the full handoff chain:

# test_swarm.py
import asyncio
from orchestrator import SwarmOrchestrator

CUSTOMER = {
    "customer_id": "cust_test_001",
    "name": "Test User",
    "plan": "starter",
    "mrr": 29.0,
    "open_tickets": 1,
    "tags": ["trial-convert"],
}

SCENARIOS = [
    {
        "name": "Technical Issue -> Troubleshooter",
        "messages": [
            "My webhook endpoints stopped receiving events about 2 hours ago.",
            "I checked and my endpoint URL is correct. The logs show nothing incoming.",
        ],
    },
    {
        "name": "Upgrade Inquiry -> Upseller",
        "messages": [
            "I'm interested in getting more API seats for my team.",
            "What's the difference between Professional and Enterprise?",
        ],
    },
    {
        "name": "Billing -> Sales Handoff",
        "messages": [
            "I have a question about my last invoice.",
            "Actually, while I'm here, can I also ask about upgrading my plan?",
        ],
    },
]


async def run_tests():
    orch = SwarmOrchestrator()

    for scenario in SCENARIOS:
        print(f"\n{'='*60}")
        print(f"SCENARIO: {scenario['name']}")
        print(f"{'='*60}")

        conv_id = f"test_{scenario['name'].replace(' ', '_').lower()}"

        for msg in scenario["messages"]:
            print(f"\nCustomer: {msg}")
            result = await orch.handle_message(conv_id, CUSTOMER, msg)
            print(f"Agent ({result['agent']}): {result['response']}")
            if result["handoffs"]:
                print(f"  Handoffs: {[h['target_agent'] for h in result['handoffs']]}")


if __name__ == "__main__":
    asyncio.run(run_tests())

Run with python test_swarm.py and verify that:

  1. Technical issues get routed to the Troubleshooter
  2. Upgrade questions reach the Upseller
  3. Billing questions that evolve into upgrade interest trigger a handoff from Billing to Upseller
  4. No infinite handoff loops occur

FAQ

Q: How does this differ from using a single agent with multiple tools? A single agent with many tools suffers from “tool confusion” as the tool count grows. With the Swarm pattern, each agent has a small, focused tool set (typically 2-4 tools), which dramatically improves accuracy. The LLM performs better when its instructions and capabilities are narrow and well-defined.

Q: What happens if two agents keep handing off to each other? The orchestrator enforces a MAX_HANDOFFS_PER_TURN limit (defaulting to 5). If this limit is reached, it breaks the loop and escalates to a human agent. In practice, well-designed agent boundaries prevent this from happening.

Q: Can I use a cheaper model for the Router and a more capable model for specialists? Absolutely. This is a recommended optimization. The Router performs a simple classification task and works well with gpt-4o-mini, while specialist agents that need to reason about complex technical issues benefit from gpt-4o. Modify the call_llm method in each agent’s base class to accept a model override.

Q: How do I add a new specialist agent? Create a new class that extends BaseAgent, define its agent_type, system_prompt, and process method, then register it in the SwarmOrchestrator.__init__ agents dictionary and add the new AgentType to the RouterAgent tool definition.

Q: Is this production-ready? The architecture is production-grade, but you will want to add: rate limiting on the API endpoints, proper database storage instead of JSON files, authentication and authorization, observability (structured logging, distributed tracing), and retry logic with exponential backoff on OpenAI API calls.

Next Steps

Now that you have a working multi-agent customer success system, consider these enhancements:

  • Add observability. Integrate structured logging with tools like Datadog or OpenTelemetry to track handoff patterns, agent response times, and resolution rates across your swarm.
  • Implement evaluation. Build a test suite that measures routing accuracy, resolution quality, and customer satisfaction scores against a labeled dataset of historical support tickets.
  • Connect real data sources. Replace the stubbed knowledge base and invoice lookups with actual integrations to your documentation platform, billing system, and CRM.
  • Add streaming. Modify the FastAPI endpoint to use Server-Sent Events so customers see agent responses as they are generated rather than waiting for the full completion.
  • Deploy with containers. Package each agent as an independent service behind a message queue (such as Redis Streams or RabbitMQ) for horizontal scaling and fault isolation.
  • Build an analytics dashboard. Use the handoff history data to visualize common customer journeys, identify bottlenecks, and discover which agent transitions indicate product problems worth fixing.
Last Updated: 3/10/2026
#OpenAI #Swarm #Agents #SaaS