Chapter 4: Agent Integration
Integrating MCP with AI Agents and Frameworks
This chapter explores how to integrate MCP servers with popular AI agent frameworks and build sophisticated multi-agent workflows. We'll cover integration patterns, best practices, and real-world examples of MCP-powered AI agents.
Understanding Agent-MCP Architecture
The Agent-MCP Ecosystem
Core Integration Patterns
- Direct Integration: Agent directly communicates with MCP servers
- Router Pattern: Centralized MCP router manages multiple servers
- Plugin Architecture: MCP servers as dynamically loaded plugins
- Service Mesh: Distributed MCP servers in microservices architecture
LangChain Integration
Setting Up LangChain with MCP
// agents/langchain-mcp-agent.ts
import { ChatOpenAI } from "@langchain/openai";
import { AgentExecutor, createToolCallingAgent } from "langchain/agents";
import { ChatPromptTemplate } from "@langchain/core/prompts";
import { Tool } from "@langchain/core/tools";
import { MCPClient } from "../mcp/client.js";
import { MCPToolWrapper } from "./mcp-tool-wrapper.js";
export class LangChainMCPAgent {
private llm: ChatOpenAI;
private mcpClients: Map<string, MCPClient>;
private tools: Tool[];
private agent: AgentExecutor;
constructor(config: {
openaiApiKey: string;
mcpServers: Array<{
name: string;
command: string;
args?: string[];
}>;
}) {
this.llm = new ChatOpenAI({
openAIApiKey: config.openaiApiKey,
modelName: "gpt-4-turbo",
temperature: 0,
});
this.mcpClients = new Map();
this.tools = [];
this.initializeMCPClients(config.mcpServers);
}
private async initializeMCPClients(
servers: Array<{ name: string; command: string; args?: string[] }>
): Promise<void> {
for (const server of servers) {
const client = new MCPClient(server.command, server.args);
await client.connect();
this.mcpClients.set(server.name, client);
// Convert MCP tools to LangChain tools
const mcpTools = await client.listTools();
for (const mcpTool of mcpTools) {
const langchainTool = new MCPToolWrapper(client, mcpTool, server.name);
this.tools.push(langchainTool);
}
}
}
async createAgent(): Promise<void> {
const prompt = ChatPromptTemplate.fromMessages([
[
"system",
`You are a helpful AI assistant with access to various tools through MCP (Model Context Protocol).
Available MCP servers: ${Array.from(this.mcpClients.keys()).join(", ")}
You can:
- Query databases and retrieve structured data
- Search and manipulate files
- Make HTTP requests to APIs
- Analyze code and documents
- Access knowledge bases and documentation
Always explain what you're doing and why you're using specific tools.
Be thorough in your analysis and provide detailed, helpful responses.`,
],
["human", "{input}"],
["placeholder", "{agent_scratchpad}"],
]);
const agent = await createToolCallingAgent({
llm: this.llm,
tools: this.tools,
prompt,
});
this.agent = new AgentExecutor({
agent,
tools: this.tools,
verbose: true,
maxIterations: 10,
returnIntermediateSteps: true,
});
}
async execute(input: string): Promise<{
output: string;
intermediateSteps: any[];
}> {
if (!this.agent) {
await this.createAgent();
}
try {
const result = await this.agent.invoke({
input,
});
return {
output: result.output,
intermediateSteps: result.intermediateSteps || [],
};
} catch (error) {
console.error("Agent execution failed:", error);
throw error;
}
}
async cleanup(): Promise<void> {
for (const client of this.mcpClients.values()) {
await client.disconnect();
}
this.mcpClients.clear();
}
}MCP Tool Wrapper for LangChain
// agents/mcp-tool-wrapper.ts
import { Tool } from "@langchain/core/tools";
import { z } from "zod";
import { MCPClient } from "../mcp/client.js";
export class MCPToolWrapper extends Tool {
name: string;
description: string;
schema: z.ZodSchema<any>;
constructor(
private mcpClient: MCPClient,
private mcpTool: any,
private serverName: string
) {
super();
this.name = `${serverName}_${mcpTool.name}`;
this.description = `[${serverName}] ${mcpTool.description}`;
// Convert JSON schema to Zod schema
this.schema = this.convertJsonSchemaToZod(mcpTool.inputSchema);
}
async _call(args: any): Promise<string> {
try {
const result = await this.mcpClient.callTool(this.mcpTool.name, args);
// Format the result for LangChain
if (result.content) {
return result.content
.map((content: any) => {
if (content.type === "text") {
return content.text;
} else if (content.type === "image") {
return `[Image: ${content.data.substring(0, 100)}...]`;
}
return JSON.stringify(content);
})
.join("\n");
}
return JSON.stringify(result);
} catch (error) {
return `Error executing tool ${this.mcpTool.name}: ${error.message}`;
}
}
private convertJsonSchemaToZod(jsonSchema: any): z.ZodSchema<any> {
// Simple JSON Schema to Zod conversion
// In production, use a proper conversion library
const properties = jsonSchema.properties || {};
const required = jsonSchema.required || [];
const zodObject: Record<string, z.ZodTypeAny> = {};
for (const [key, prop] of Object.entries(properties)) {
const propSchema = prop as any;
let zodType: z.ZodTypeAny;
switch (propSchema.type) {
case "string":
zodType = z.string();
if (propSchema.enum) {
zodType = z.enum(propSchema.enum);
}
break;
case "number":
zodType = z.number();
break;
case "boolean":
zodType = z.boolean();
break;
case "array":
zodType = z.array(z.any());
break;
case "object":
zodType = z.object({});
break;
default:
zodType = z.any();
}
if (propSchema.description) {
zodType = zodType.describe(propSchema.description);
}
if (!required.includes(key)) {
zodType = zodType.optional();
}
zodObject[key] = zodType;
}
return z.object(zodObject);
}
}Example LangChain MCP Usage
// examples/langchain-example.ts
import { LangChainMCPAgent } from "../agents/langchain-mcp-agent.js";
async function runLangChainExample() {
const agent = new LangChainMCPAgent({
openaiApiKey: process.env.OPENAI_API_KEY!,
mcpServers: [
{
name: "filesystem",
command: "node",
args: ["dist/servers/filesystem-server.js"],
},
{
name: "database",
command: "node",
args: ["dist/servers/database-server.js"],
},
{
name: "web-api",
command: "node",
args: ["dist/servers/api-server.js"],
},
],
});
try {
// Complex multi-step task
const result = await agent.execute(`
I need to analyze our customer database and create a comprehensive report.
1. First, query the database to get customer statistics for the last quarter
2. Then search for any related documentation files
3. Finally, fetch the latest market data from our API
4. Combine all this information into a structured analysis
`);
console.log("Agent Output:", result.output);
console.log("\nIntermediate Steps:");
result.intermediateSteps.forEach((step, i) => {
console.log(`${i + 1}. ${step.action.tool}: ${step.action.toolInput}`);
console.log(` Result: ${step.observation.substring(0, 200)}...`);
});
} finally {
await agent.cleanup();
}
}
runLangChainExample().catch(console.error);AutoGen Integration
Multi-Agent MCP System with AutoGen
# agents/autogen_mcp_system.py
from typing import Dict, List, Any, Optional
import asyncio
import json
from autogen import AssistantAgent, UserProxyAgent, GroupChat, GroupChatManager
from mcp_client import MCPClient
class MCPTool:
def __init__(self, client: MCPClient, tool_info: Dict[str, Any], server_name: str):
self.client = client
self.tool_info = tool_info
self.server_name = server_name
self.name = f"{server_name}_{tool_info['name']}"
self.description = tool_info.get('description', '')
async def execute(self, **kwargs) -> str:
try:
result = await self.client.call_tool(
self.tool_info['name'],
kwargs
)
if result.get('content'):
return '\n'.join([
content.get('text', str(content))
for content in result['content']
])
return json.dumps(result, indent=2)
except Exception as e:
return f"Error executing {self.name}: {str(e)}"
class MCPEnabledAgent(AssistantAgent):
def __init__(
self,
name: str,
llm_config: Dict[str, Any],
mcp_servers: List[Dict[str, Any]],
**kwargs
):
super().__init__(name=name, llm_config=llm_config, **kwargs)
self.mcp_clients: Dict[str, MCPClient] = {}
self.mcp_tools: Dict[str, MCPTool] = {}
asyncio.create_task(self._initialize_mcp_servers(mcp_servers))
async def _initialize_mcp_servers(self, servers: List[Dict[str, Any]]):
for server_config in servers:
client = MCPClient(
command=server_config['command'],
args=server_config.get('args', [])
)
await client.connect()
self.mcp_clients[server_config['name']] = client
# Register tools
tools = await client.list_tools()
for tool_info in tools:
tool = MCPTool(client, tool_info, server_config['name'])
self.mcp_tools[tool.name] = tool
async def execute_mcp_tool(self, tool_name: str, **kwargs) -> str:
if tool_name not in self.mcp_tools:
return f"Tool {tool_name} not found. Available tools: {list(self.mcp_tools.keys())}"
return await self.mcp_tools[tool_name].execute(**kwargs)
def get_available_tools(self) -> Dict[str, str]:
return {
name: tool.description
for name, tool in self.mcp_tools.items()
}
class AutoGenMCPOrchestrator:
def __init__(self, openai_config: Dict[str, Any]):
self.openai_config = openai_config
self.agents: List[MCPEnabledAgent] = []
self.user_proxy = None
self.group_chat = None
self.manager = None
def create_specialized_agents(self):
# Database Specialist Agent
database_agent = MCPEnabledAgent(
name="DatabaseAnalyst",
llm_config=self.openai_config,
system_message="""You are a database analyst specialist. Your role is to:
- Query databases efficiently and accurately
- Analyze data patterns and relationships
- Generate insights from structured data
- Provide data summaries and statistics
You have access to database MCP tools. Always use proper SQL practices and explain your queries.""",
mcp_servers=[
{
'name': 'database',
'command': 'node',
'args': ['dist/servers/database-server.js']
}
]
)
# File System Specialist Agent
filesystem_agent = MCPEnabledAgent(
name="FileSystemExpert",
llm_config=self.openai_config,
system_message="""You are a file system specialist. Your role is to:
- Search, read, and analyze files
- Organize and manage file structures
- Extract information from documents
- Handle various file formats
You have access to filesystem MCP tools. Always respect file permissions and security.""",
mcp_servers=[
{
'name': 'filesystem',
'command': 'node',
'args': ['dist/servers/filesystem-server.js']
}
]
)
# API Integration Specialist Agent
api_agent = MCPEnabledAgent(
name="APIIntegrator",
llm_config=self.openai_config,
system_message="""You are an API integration specialist. Your role is to:
- Make HTTP requests to external APIs
- Parse and process API responses
- Handle authentication and rate limiting
- Integrate multiple data sources
You have access to API MCP tools. Always handle errors gracefully and respect API limits.""",
mcp_servers=[
{
'name': 'api',
'command': 'node',
'args': ['dist/servers/api-server.js']
}
]
)
# Synthesis and Reporting Agent
synthesis_agent = MCPEnabledAgent(
name="ReportSynthesizer",
llm_config=self.openai_config,
system_message="""You are a synthesis and reporting specialist. Your role is to:
- Combine information from multiple sources
- Create comprehensive reports and summaries
- Generate insights and recommendations
- Present data in clear, actionable formats
You work with data provided by other specialists to create final deliverables.""",
mcp_servers=[] # No direct MCP access, works with other agents' outputs
)
self.agents = [database_agent, filesystem_agent, api_agent, synthesis_agent]
# User proxy for human interaction
self.user_proxy = UserProxyAgent(
name="UserProxy",
human_input_mode="NEVER",
max_consecutive_auto_reply=0,
code_execution_config=False
)
def setup_group_chat(self):
self.group_chat = GroupChat(
agents=self.agents + [self.user_proxy],
messages=[],
max_round=20,
speaker_selection_method="round_robin"
)
self.manager = GroupChatManager(
groupchat=self.group_chat,
llm_config=self.openai_config,
system_message="""You are managing a team of specialists working together on complex tasks.
Team members:
- DatabaseAnalyst: Handles database queries and data analysis
- FileSystemExpert: Manages file operations and document analysis
- APIIntegrator: Handles external API calls and data integration
- ReportSynthesizer: Creates final reports and summaries
Coordinate their work efficiently to complete user requests."""
)
async def execute_task(self, task: str) -> str:
if not self.group_chat:
self.create_specialized_agents()
self.setup_group_chat()
# Start the conversation
result = await self.user_proxy.initiate_chat(
self.manager,
message=task,
summary_method="reflection_with_llm"
)
return result.summary
# Usage example
async def run_autogen_example():
orchestrator = AutoGenMCPOrchestrator({
"config_list": [
{
"model": "gpt-4-turbo",
"api_key": os.getenv("OPENAI_API_KEY")
}
],
"temperature": 0
})
complex_task = """
I need a comprehensive business intelligence report on our Q4 performance.
Please:
1. Query our sales database for Q4 revenue, customer acquisition, and product performance
2. Find and analyze any existing Q4 reports or documents in our file system
3. Fetch current market data and competitor information from our API sources
4. Synthesize all this information into a strategic report with recommendations
The report should include executive summary, key metrics, trends, and actionable insights.
"""
result = await orchestrator.execute_task(complex_task)
print("Final Report:", result)
if __name__ == "__main__":
asyncio.run(run_autogen_example())CrewAI Integration
MCP-Powered CrewAI Agents
# agents/crewai_mcp_system.py
from crewai import Agent, Task, Crew, Process
from crewai.tools import BaseTool
from typing import Dict, Any, List
import asyncio
import json
class MCPTool(BaseTool):
name: str
description: str
def __init__(self, mcp_client, tool_info: Dict[str, Any], server_name: str):
super().__init__()
self.mcp_client = mcp_client
self.tool_info = tool_info
self.server_name = server_name
self.name = f"{server_name}_{tool_info['name']}"
self.description = tool_info.get('description', '')
def _run(self, **kwargs) -> str:
loop = asyncio.get_event_loop()
return loop.run_until_complete(self._async_run(**kwargs))
async def _async_run(self, **kwargs) -> str:
try:
result = await self.mcp_client.call_tool(
self.tool_info['name'],
kwargs
)
if result.get('content'):
return '\n'.join([
content.get('text', str(content))
for content in result['content']
])
return json.dumps(result, indent=2)
except Exception as e:
return f"Error executing {self.name}: {str(e)}"
class CrewAIMCPSystem:
def __init__(self, llm_config: Dict[str, Any]):
self.llm_config = llm_config
self.mcp_clients = {}
self.mcp_tools = {}
async def initialize_mcp_servers(self, servers: List[Dict[str, Any]]):
for server_config in servers:
client = MCPClient(
command=server_config['command'],
args=server_config.get('args', [])
)
await client.connect()
self.mcp_clients[server_config['name']] = client
# Convert MCP tools to CrewAI tools
tools = await client.list_tools()
for tool_info in tools:
tool = MCPTool(client, tool_info, server_config['name'])
self.mcp_tools[tool.name] = tool
def create_data_research_crew(self):
# Data Collector Agent
data_collector = Agent(
role='Data Collection Specialist',
goal='Gather comprehensive data from databases, files, and APIs',
backstory="""You are an expert at collecting data from various sources.
You have access to database queries, file system operations, and API calls.
Your job is to efficiently gather all relevant information for analysis.""",
verbose=True,
allow_delegation=False,
tools=list(self.mcp_tools.values()),
llm=self.llm_config
)
# Data Analyst Agent
data_analyst = Agent(
role='Data Analysis Expert',
goal='Analyze collected data and extract meaningful insights',
backstory="""You are a skilled data analyst who can identify patterns,
trends, and anomalies in data. You work with raw data to generate
actionable insights and recommendations.""",
verbose=True,
allow_delegation=False,
tools=[], # Works with data provided by collector
llm=self.llm_config
)
# Report Generator Agent
report_generator = Agent(
role='Report Generation Specialist',
goal='Create comprehensive, well-structured reports',
backstory="""You are an expert at creating professional reports
that communicate complex findings clearly. You synthesize analysis
into actionable business intelligence.""",
verbose=True,
allow_delegation=False,
tools=[],
llm=self.llm_config
)
return [data_collector, data_analyst, report_generator]
def create_research_tasks(self, research_query: str):
# Task 1: Data Collection
data_collection_task = Task(
description=f"""
Collect comprehensive data related to: {research_query}
Your tasks:
1. Query relevant databases for structured data
2. Search file systems for related documents and reports
3. Fetch external data from APIs where applicable
4. Organize and summarize all collected data
Provide a structured summary of all data sources and key findings.
""",
expected_output="Structured data collection summary with sources and key data points",
agent=None # Will be assigned to data_collector
)
# Task 2: Data Analysis
analysis_task = Task(
description="""
Analyze the collected data to identify:
1. Key trends and patterns
2. Significant correlations or relationships
3. Anomalies or outliers
4. Performance metrics and KPIs
5. Comparative analysis where applicable
Generate insights and preliminary recommendations.
""",
expected_output="Detailed analysis report with insights and recommendations",
agent=None # Will be assigned to data_analyst
)
# Task 3: Report Generation
report_task = Task(
description="""
Create a comprehensive business report that includes:
1. Executive summary
2. Methodology and data sources
3. Key findings and analysis
4. Visualizations and charts (described)
5. Strategic recommendations
6. Next steps and action items
Ensure the report is professional, clear, and actionable.
""",
expected_output="Professional business report with executive summary and recommendations",
agent=None # Will be assigned to report_generator
)
return [data_collection_task, analysis_task, report_task]
async def execute_research_project(self, research_query: str) -> str:
# Create crew
agents = self.create_data_research_crew()
tasks = self.create_research_tasks(research_query)
# Assign agents to tasks
tasks[0].agent = agents[0] # data_collector
tasks[1].agent = agents[1] # data_analyst
tasks[2].agent = agents[2] # report_generator
# Create and run crew
crew = Crew(
agents=agents,
tasks=tasks,
process=Process.sequential,
verbose=2
)
result = crew.kickoff()
return result
# Advanced multi-crew system
class AdvancedMCPCrewSystem:
def __init__(self, llm_config: Dict[str, Any]):
self.llm_config = llm_config
self.base_system = CrewAIMCPSystem(llm_config)
async def initialize(self, mcp_servers: List[Dict[str, Any]]):
await self.base_system.initialize_mcp_servers(mcp_servers)
def create_specialized_crews(self):
"""Create multiple specialized crews for different domains"""
# Financial Analysis Crew
financial_crew = self._create_financial_crew()
# Marketing Analysis Crew
marketing_crew = self._create_marketing_crew()
# Operations Analysis Crew
operations_crew = self._create_operations_crew()
return {
'financial': financial_crew,
'marketing': marketing_crew,
'operations': operations_crew
}
def _create_financial_crew(self):
financial_analyst = Agent(
role='Financial Data Analyst',
goal='Analyze financial data and generate insights',
backstory="Expert in financial analysis, accounting, and business metrics",
tools=list(self.base_system.mcp_tools.values()),
llm=self.llm_config
)
risk_assessor = Agent(
role='Risk Assessment Specialist',
goal='Identify and assess business risks',
backstory="Specialist in risk analysis and mitigation strategies",
tools=[],
llm=self.llm_config
)
return Crew(
agents=[financial_analyst, risk_assessor],
tasks=[], # Tasks will be created dynamically
process=Process.sequential
)
def _create_marketing_crew(self):
market_researcher = Agent(
role='Market Research Analyst',
goal='Research market trends and customer behavior',
backstory="Expert in market research and customer analysis",
tools=list(self.base_system.mcp_tools.values()),
llm=self.llm_config
)
campaign_analyst = Agent(
role='Campaign Performance Analyst',
goal='Analyze marketing campaign effectiveness',
backstory="Specialist in marketing metrics and campaign optimization",
tools=[],
llm=self.llm_config
)
return Crew(
agents=[market_researcher, campaign_analyst],
tasks=[],
process=Process.sequential
)
def _create_operations_crew(self):
operations_analyst = Agent(
role='Operations Efficiency Analyst',
goal='Analyze operational processes and efficiency',
backstory="Expert in operations management and process optimization",
tools=list(self.base_system.mcp_tools.values()),
llm=self.llm_config
)
quality_assessor = Agent(
role='Quality Assurance Specialist',
goal='Assess quality metrics and standards',
backstory="Specialist in quality management and continuous improvement",
tools=[],
llm=self.llm_config
)
return Crew(
agents=[operations_analyst, quality_assessor],
tasks=[],
process=Process.sequential
)
# Usage example
async def run_crewai_example():
system = AdvancedMCPCrewSystem({
"model": "gpt-4-turbo",
"api_key": os.getenv("OPENAI_API_KEY"),
"temperature": 0
})
await system.initialize([
{
'name': 'database',
'command': 'node',
'args': ['dist/servers/database-server.js']
},
{
'name': 'filesystem',
'command': 'node',
'args': ['dist/servers/filesystem-server.js']
},
{
'name': 'api',
'command': 'node',
'args': ['dist/servers/api-server.js']
}
])
# Execute comprehensive business analysis
result = await system.base_system.execute_research_project(
"Q4 2024 Business Performance Analysis across all departments"
)
print("Research Results:")
print(result)
if __name__ == "__main__":
asyncio.run(run_crewai_example())Custom Agent Framework Integration
Building a Custom MCP-Aware Agent
// agents/custom-mcp-agent.ts
import { EventEmitter } from "events";
import { MCPClient } from "../mcp/client.js";
import { OpenAI } from "openai";
interface AgentConfig {
name: string;
systemPrompt: string;
mcpServers: Array<{
name: string;
command: string;
args?: string[];
}>;
llmConfig: {
model: string;
apiKey: string;
temperature?: number;
};
}
interface AgentMemory {
conversationHistory: Array<{
role: "user" | "assistant" | "system";
content: string;
timestamp: Date;
}>;
toolUsage: Array<{
tool: string;
args: any;
result: any;
timestamp: Date;
}>;
context: Record<string, any>;
}
export class CustomMCPAgent extends EventEmitter {
private config: AgentConfig;
private mcpClients: Map<string, MCPClient>;
private availableTools: Map<string, any>;
private openai: OpenAI;
private memory: AgentMemory;
private taskQueue: Array<{
id: string;
task: string;
priority: number;
callback?: (result: string) => void;
}>;
constructor(config: AgentConfig) {
super();
this.config = config;
this.mcpClients = new Map();
this.availableTools = new Map();
this.taskQueue = [];
this.openai = new OpenAI({
apiKey: config.llmConfig.apiKey,
});
this.memory = {
conversationHistory: [],
toolUsage: [],
context: {},
};
this.initializeMCPConnections();
}
private async initializeMCPConnections(): Promise<void> {
for (const serverConfig of this.config.mcpServers) {
try {
const client = new MCPClient(serverConfig.command, serverConfig.args);
await client.connect();
this.mcpClients.set(serverConfig.name, client);
// Load available tools from this server
const tools = await client.listTools();
tools.forEach((tool) => {
this.availableTools.set(`${serverConfig.name}:${tool.name}`, {
...tool,
serverName: serverConfig.name,
client: client,
});
});
this.emit("serverConnected", serverConfig.name);
} catch (error) {
this.emit(
"error",
`Failed to connect to ${serverConfig.name}: ${error.message}`
);
}
}
this.emit("initialized", {
servers: Array.from(this.mcpClients.keys()),
tools: Array.from(this.availableTools.keys()),
});
}
async processTask(task: string): Promise<string> {
this.memory.conversationHistory.push({
role: "user",
content: task,
timestamp: new Date(),
});
// Analyze task and determine required tools
const analysis = await this.analyzeTask(task);
// Create execution plan
const plan = await this.createExecutionPlan(task, analysis);
// Execute plan step by step
const result = await this.executePlan(plan);
this.memory.conversationHistory.push({
role: "assistant",
content: result,
timestamp: new Date(),
});
return result;
}
private async analyzeTask(task: string): Promise<{
complexity: "simple" | "moderate" | "complex";
requiredTools: string[];
estimatedSteps: number;
riskLevel: "low" | "medium" | "high";
}> {
const analysisPrompt = `
Analyze the following task and determine:
1. Complexity level (simple/moderate/complex)
2. Which tools might be needed from: ${Array.from(
this.availableTools.keys()
).join(", ")}
3. Estimated number of steps required
4. Risk level of the operations
Task: ${task}
Respond in JSON format:
{
"complexity": "simple|moderate|complex",
"requiredTools": ["tool1", "tool2"],
"estimatedSteps": number,
"riskLevel": "low|medium|high",
"reasoning": "explanation"
}
`;
const response = await this.openai.chat.completions.create({
model: this.config.llmConfig.model,
messages: [
{ role: "system", content: this.config.systemPrompt },
{ role: "user", content: analysisPrompt },
],
temperature: 0.1,
});
try {
return JSON.parse(response.choices[0].message.content || "{}");
} catch {
// Fallback analysis
return {
complexity: "moderate",
requiredTools: [],
estimatedSteps: 3,
riskLevel: "medium",
};
}
}
private async createExecutionPlan(
task: string,
analysis: any
): Promise<
Array<{
step: number;
action: string;
tool?: string;
args?: any;
expectedOutput: string;
}>
> {
const planningPrompt = `
Create a detailed execution plan for the following task:
Task: ${task}
Analysis: ${JSON.stringify(analysis, null, 2)}
Available tools:
${Array.from(this.availableTools.entries())
.map(([name, tool]) => `${name}: ${tool.description}`)
.join("\n")}
Create a step-by-step plan in JSON format:
{
"steps": [
{
"step": 1,
"action": "description of what to do",
"tool": "tool_name_if_needed",
"args": {"key": "value"},
"expectedOutput": "what we expect to get"
}
]
}
`;
const response = await this.openai.chat.completions.create({
model: this.config.llmConfig.model,
messages: [
{ role: "system", content: this.config.systemPrompt },
{ role: "user", content: planningPrompt },
],
temperature: 0.2,
});
try {
const plan = JSON.parse(
response.choices[0].message.content || '{"steps":[]}'
);
return plan.steps || [];
} catch {
return [
{
step: 1,
action: "Process task directly",
expectedOutput: "Task completion",
},
];
}
}
private async executePlan(
plan: Array<{
step: number;
action: string;
tool?: string;
args?: any;
expectedOutput: string;
}>
): Promise<string> {
const results: string[] = [];
const executionLog: any[] = [];
for (const stepPlan of plan) {
this.emit("stepStarted", stepPlan);
try {
let stepResult: string;
if (stepPlan.tool) {
// Execute tool
stepResult = await this.executeTool(
stepPlan.tool,
stepPlan.args || {}
);
// Log tool usage
this.memory.toolUsage.push({
tool: stepPlan.tool,
args: stepPlan.args,
result: stepResult,
timestamp: new Date(),
});
} else {
// Process step with LLM
stepResult = await this.processWithLLM(
stepPlan.action,
results.join("\n\n")
);
}
results.push(`Step ${stepPlan.step}: ${stepResult}`);
executionLog.push({
step: stepPlan.step,
action: stepPlan.action,
result: stepResult,
success: true,
});
this.emit("stepCompleted", { ...stepPlan, result: stepResult });
} catch (error) {
const errorMsg = `Step ${stepPlan.step} failed: ${error.message}`;
results.push(errorMsg);
executionLog.push({
step: stepPlan.step,
action: stepPlan.action,
error: error.message,
success: false,
});
this.emit("stepFailed", { ...stepPlan, error: error.message });
// Attempt recovery or continue based on error severity
if (this.isCriticalError(error)) {
break;
}
}
}
// Generate final summary
return await this.generateFinalSummary(results, executionLog);
}
private async executeTool(toolName: string, args: any): Promise<string> {
const tool = this.availableTools.get(toolName);
if (!tool) {
throw new Error(`Tool ${toolName} not found`);
}
const result = await tool.client.callTool(tool.name, args);
if (result.content) {
return result.content
.map((content: any) => content.text || JSON.stringify(content))
.join("\n");
}
return JSON.stringify(result);
}
private async processWithLLM(
action: string,
context: string
): Promise<string> {
const response = await this.openai.chat.completions.create({
model: this.config.llmConfig.model,
messages: [
{ role: "system", content: this.config.systemPrompt },
{ role: "user", content: `${action}\n\nContext:\n${context}` },
],
temperature: this.config.llmConfig.temperature || 0.7,
});
return response.choices[0].message.content || "";
}
private async generateFinalSummary(
results: string[],
executionLog: any[]
): Promise<string> {
const summaryPrompt = `
Summarize the execution results and provide insights:
Results:
${results.join("\n\n")}
Execution Log:
${JSON.stringify(executionLog, null, 2)}
Provide a comprehensive summary with:
1. What was accomplished
2. Key findings or insights
3. Any issues encountered
4. Recommendations for next steps
`;
const response = await this.openai.chat.completions.create({
model: this.config.llmConfig.model,
messages: [
{ role: "system", content: this.config.systemPrompt },
{ role: "user", content: summaryPrompt },
],
temperature: 0.3,
});
return response.choices[0].message.content || results.join("\n\n");
}
private isCriticalError(error: any): boolean {
const criticalPatterns = [
/permission denied/i,
/authentication failed/i,
/network unreachable/i,
/database connection failed/i,
];
return criticalPatterns.some((pattern) => pattern.test(error.message));
}
getMemory(): AgentMemory {
return { ...this.memory };
}
async cleanup(): Promise<void> {
for (const client of this.mcpClients.values()) {
await client.disconnect();
}
this.mcpClients.clear();
this.availableTools.clear();
}
}
// Usage example
async function runCustomAgentExample() {
const agent = new CustomMCPAgent({
name: "BusinessAnalysisAgent",
systemPrompt: `You are a sophisticated business analysis agent with access to multiple data sources through MCP servers. You can query databases, analyze files, and fetch external data to provide comprehensive business insights.`,
mcpServers: [
{
name: "database",
command: "node",
args: ["dist/servers/database-server.js"],
},
{
name: "filesystem",
command: "node",
args: ["dist/servers/filesystem-server.js"],
},
],
llmConfig: {
model: "gpt-4-turbo",
apiKey: process.env.OPENAI_API_KEY!,
temperature: 0.7,
},
});
// Set up event listeners
agent.on("initialized", (info) => {
console.log("Agent initialized:", info);
});
agent.on("stepStarted", (step) => {
console.log(`Starting step ${step.step}: ${step.action}`);
});
agent.on("stepCompleted", (step) => {
console.log(`Completed step ${step.step}`);
});
try {
const result = await agent.processTask(`
Analyze our customer database to identify:
1. Top 10 customers by revenue
2. Customer acquisition trends over the last 6 months
3. Product performance metrics
4. Geographic distribution of customers
Also search for any existing customer analysis reports in our file system
and incorporate those findings into a comprehensive summary.
`);
console.log("Final Result:");
console.log(result);
} finally {
await agent.cleanup();
}
}
runCustomAgentExample().catch(console.error);Agent Orchestration Patterns
MCP Router for Multi-Agent Systems
// orchestration/mcp-router.ts
export class MCPRouter {
private mcpClients: Map<string, MCPClient> = new Map();
private loadBalancer: LoadBalancer;
private circuitBreaker: CircuitBreaker;
constructor() {
this.loadBalancer = new LoadBalancer();
this.circuitBreaker = new CircuitBreaker();
}
async registerMCPServer(
name: string,
config: {
command: string;
args?: string[];
weight?: number;
healthCheck?: () => Promise<boolean>;
}
): Promise<void> {
const client = new MCPClient(config.command, config.args);
await client.connect();
this.mcpClients.set(name, client);
this.loadBalancer.addServer(name, config.weight || 1);
if (config.healthCheck) {
this.circuitBreaker.addHealthCheck(name, config.healthCheck);
}
}
async routeToolCall(
toolName: string,
args: any,
preferredServer?: string
): Promise<any> {
let targetServer = preferredServer;
if (!targetServer) {
// Find servers that have this tool
const serversWithTool = await this.findServersWithTool(toolName);
if (serversWithTool.length === 0) {
throw new Error(`No servers found with tool: ${toolName}`);
}
// Use load balancer to select server
targetServer = this.loadBalancer.selectServer(serversWithTool);
}
if (!targetServer) {
throw new Error("No available servers");
}
// Check circuit breaker
if (!this.circuitBreaker.isAvailable(targetServer)) {
throw new Error(`Server ${targetServer} is currently unavailable`);
}
try {
const client = this.mcpClients.get(targetServer);
if (!client) {
throw new Error(`Server ${targetServer} not found`);
}
const result = await client.callTool(toolName, args);
this.circuitBreaker.recordSuccess(targetServer);
return result;
} catch (error) {
this.circuitBreaker.recordFailure(targetServer);
throw error;
}
}
private async findServersWithTool(toolName: string): Promise<string[]> {
const serversWithTool: string[] = [];
for (const [serverName, client] of this.mcpClients) {
try {
const tools = await client.listTools();
if (tools.some((tool) => tool.name === toolName)) {
serversWithTool.push(serverName);
}
} catch (error) {
console.warn(`Failed to list tools for server ${serverName}:`, error);
}
}
return serversWithTool;
}
}
class LoadBalancer {
private servers: Map<string, { weight: number; currentLoad: number }> =
new Map();
addServer(name: string, weight: number = 1): void {
this.servers.set(name, { weight, currentLoad: 0 });
}
selectServer(availableServers: string[]): string | null {
const candidates = availableServers
.map((name) => ({
name,
server: this.servers.get(name),
}))
.filter((candidate) => candidate.server)
.map((candidate) => ({
name: candidate.name,
score: candidate.server!.weight / (candidate.server!.currentLoad + 1),
}))
.sort((a, b) => b.score - a.score);
if (candidates.length === 0) return null;
const selected = candidates[0].name;
const server = this.servers.get(selected)!;
server.currentLoad++;
return selected;
}
releaseServer(name: string): void {
const server = this.servers.get(name);
if (server && server.currentLoad > 0) {
server.currentLoad--;
}
}
}
class CircuitBreaker {
private states: Map<
string,
{
state: "CLOSED" | "OPEN" | "HALF_OPEN";
failures: number;
lastFailureTime: Date | null;
healthCheck?: () => Promise<boolean>;
}
> = new Map();
private readonly failureThreshold = 5;
private readonly recoveryTimeout = 60000; // 1 minute
addHealthCheck(
serverName: string,
healthCheck: () => Promise<boolean>
): void {
this.states.set(serverName, {
state: "CLOSED",
failures: 0,
lastFailureTime: null,
healthCheck,
});
}
isAvailable(serverName: string): boolean {
const state = this.states.get(serverName);
if (!state) return true;
if (state.state === "CLOSED") return true;
if (state.state === "OPEN") {
// Check if recovery timeout has passed
if (
state.lastFailureTime &&
Date.now() - state.lastFailureTime.getTime() > this.recoveryTimeout
) {
state.state = "HALF_OPEN";
return true;
}
return false;
}
if (state.state === "HALF_OPEN") return true;
return false;
}
recordSuccess(serverName: string): void {
const state = this.states.get(serverName);
if (!state) return;
state.failures = 0;
state.state = "CLOSED";
state.lastFailureTime = null;
}
recordFailure(serverName: string): void {
const state = this.states.get(serverName);
if (!state) return;
state.failures++;
state.lastFailureTime = new Date();
if (state.failures >= this.failureThreshold) {
state.state = "OPEN";
}
}
}Key Takeaways
- Framework Integration: MCP can be integrated with popular agent frameworks like LangChain, AutoGen, and CrewAI
- Tool Wrapping: Convert MCP tools to framework-specific tool interfaces for seamless integration
- Multi-Agent Orchestration: Use MCP routers for coordinating multiple agents with different MCP servers
- Specialized Agents: Create domain-specific agents that leverage different MCP capabilities
- Error Handling: Implement robust error handling and fallback mechanisms for production deployments
- Monitoring: Add comprehensive monitoring and observability for agent-MCP interactions
- Load Balancing: Distribute tool execution across multiple MCP server instances
Next Steps
Chapter 5 will cover Production Deployment, including scaling strategies, monitoring, security considerations, and best practices for running MCP-powered systems in production environments.