new one - ai gtm email

This commit is contained in:
Madhu
2025-08-12 00:47:17 +05:30
parent 3de66d37b5
commit eeb32c6d44
7 changed files with 908 additions and 954 deletions

View File

@@ -0,0 +1,904 @@
import json
import os
import streamlit as st
from datetime import datetime
from textwrap import dedent
from typing import Dict, Iterator, List, Optional, Literal
from agno.agent import Agent
from agno.models.openai import OpenAIChat
from agno.storage.sqlite import SqliteStorage
from agno.tools.exa import ExaTools
from agno.utils.log import logger
from agno.utils.pprint import pprint_run_response
from agno.workflow import RunResponse, Workflow
from pydantic import BaseModel, Field
# Initialize API keys from environment or empty defaults
if 'EXA_API_KEY' not in st.session_state:
st.session_state.EXA_API_KEY = os.getenv("EXA_API_KEY", "")
if 'OPENAI_API_KEY' not in st.session_state:
st.session_state.OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
# Set environment variables
os.environ["EXA_API_KEY"] = st.session_state.EXA_API_KEY
os.environ["OPENAI_API_KEY"] = st.session_state.OPENAI_API_KEY
# Demo mode
# - set to True to print email to console
# - set to False to send to yourself
DEMO_MODE = True
today = datetime.now().strftime("%Y-%m-%d")
# Example leads - Replace with your actual targets
leads: Dict[str, Dict[str, str]] = {
"Notion": {
"name": "Notion",
"website": "https://www.notion.so",
"contact_name": "Ivan Zhao",
"position": "CEO",
},
# Add more companies as needed
}
# Updated sender details for an AI analytics company
sender_details_dict: Dict[str, str] = {
"name": "Sarah Chen",
"email": "your.email@company.com", # Your email goes here
"organization": "Data Consultants Inc",
"service_offered": "We help build data products and offer data consulting services",
"calendar_link": "https://calendly.com/data-consultants-inc",
"linkedin": "https://linkedin.com/in/your-profile",
"phone": "+1 (555) 123-4567",
"website": "https://www.data-consultants.com",
}
DEPARTMENT_TEMPLATES = {
"GTM (Sales & Marketing)": {
"Software Solution": """\
Hey [RECIPIENT_NAME],
I noticed [COMPANY_NAME]'s impressive [GTM_INITIATIVE] and your role in scaling [SPECIFIC_ACHIEVEMENT]. Your approach to [SALES_STRATEGY] caught my attention.
[PRODUCT_VALUE_FOR_GTM]
[GTM_SPECIFIC_BENEFIT]
Would love to show you how this could work for your team: [CALENDAR_LINK]
Best,
[SIGNATURE]\
""",
"Consulting Services": """\
Hey [RECIPIENT_NAME],
Your team's recent success with [CAMPAIGN_NAME] is impressive, particularly the [SPECIFIC_METRIC].
[CONSULTING_VALUE_PROP]
[GTM_IMPROVEMENT_POTENTIAL]
Here's my calendar if you'd like to explore this: [CALENDAR_LINK]
Best,
[SIGNATURE]\
"""
},
"Human Resources": {
"Software Solution": """\
Hey [RECIPIENT_NAME],
I've been following [COMPANY_NAME]'s growth and noticed your focus on [HR_INITIATIVE]. Your approach to [SPECIFIC_HR_PROGRAM] stands out.
[HR_TOOL_VALUE_PROP]
[HR_SPECIFIC_BENEFIT]
Would you be open to seeing how this could help your HR initiatives? [CALENDAR_LINK]
Best,
[SIGNATURE]\
""",
"Consulting Services": """\
Hey [RECIPIENT_NAME],
I've been following [COMPANY_NAME]'s journey in [INDUSTRY], and your recent [ACHIEVEMENT] caught my attention. Your approach to [SPECIFIC_FOCUS] aligns perfectly with what we're building.
[PARTNERSHIP_VALUE_PROP]
[MUTUAL_BENEFIT]
Would love to explore potential synergies over a quick call: [CALENDAR_LINK]
Best,
[SIGNATURE]\
""",
"Investment Opportunity": """\
Hey [RECIPIENT_NAME],
Your work at [COMPANY_NAME] in [SPECIFIC_FOCUS] is impressive, especially [RECENT_ACHIEVEMENT].
[INVESTMENT_THESIS]
[UNIQUE_VALUE_ADD]
Here's my calendar if you'd like to discuss: [CALENDAR_LINK]
Best,
[SIGNATURE]\
"""
},
"Marketing Professional": {
"Product Demo": """\
Hey [RECIPIENT_NAME],
I noticed [COMPANY_NAME]'s recent [MARKETING_INITIATIVE] and was impressed by [SPECIFIC_DETAIL].
[PRODUCT_VALUE_PROP]
[BENEFIT_TO_MARKETING]
Would you be open to a quick demo? Here's my calendar: [CALENDAR_LINK]
Best,
[SIGNATURE]\
""",
"Service Offering": """\
Hey [RECIPIENT_NAME],
Saw your team's work on [RECENT_CAMPAIGN] - great execution on [SPECIFIC_ELEMENT].
[SERVICE_VALUE_PROP]
[MARKETING_BENEFIT]
Here's my calendar if you'd like to explore this: [CALENDAR_LINK]
Best,
[SIGNATURE]\
"""
},
"B2B Sales Representative": {
"Product Demo": """\
Hey [RECIPIENT_NAME],
Noticed your team at [COMPANY_NAME] is scaling [SALES_FOCUS]. Your approach to [SPECIFIC_STRATEGY] is spot-on.
[PRODUCT_VALUE_PROP]
[SALES_BENEFIT]
Would you be interested in seeing how this works? Here's my calendar: [CALENDAR_LINK]
Best,
[SIGNATURE]\
""",
"Service Offering": """\
Hey [RECIPIENT_NAME],
Your sales team's success with [RECENT_WIN] caught my attention. Particularly impressed by [SPECIFIC_ACHIEVEMENT].
[SERVICE_VALUE_PROP]
[SALES_IMPROVEMENT]
Here's my calendar if you'd like to discuss: [CALENDAR_LINK]
Best,
[SIGNATURE]\
"""
}
}
COMPANY_CATEGORIES = {
"SaaS/Technology Companies": {
"description": "Software, cloud services, and tech platforms",
"typical_roles": ["CTO", "Head of Engineering", "VP of Product", "Engineering Manager", "Tech Lead"]
},
"E-commerce/Retail": {
"description": "Online retail, marketplaces, and D2C brands",
"typical_roles": ["Head of Digital", "E-commerce Manager", "Marketing Director", "Operations Head"]
},
"Financial Services": {
"description": "Banks, fintech, insurance, and investment firms",
"typical_roles": ["CFO", "Head of Innovation", "Risk Manager", "Product Manager"]
},
"Healthcare/Biotech": {
"description": "Healthcare providers, biotech, and health tech",
"typical_roles": ["Medical Director", "Head of R&D", "Clinical Manager", "Healthcare IT Lead"]
},
"Manufacturing/Industrial": {
"description": "Manufacturing, industrial automation, and supply chain",
"typical_roles": ["Operations Director", "Plant Manager", "Supply Chain Head", "Quality Manager"]
}
}
class OutreachConfig(BaseModel):
"""Configuration for email outreach"""
company_category: str = Field(..., description="Type of companies to target")
target_departments: List[str] = Field(
...,
description="Departments to target (e.g., GTM, HR, Engineering)"
)
service_type: Literal[
"Software Solution",
"Consulting Services",
"Professional Services",
"Technology Platform",
"Custom Development"
] = Field(..., description="Type of service being offered")
company_size_preference: Literal["Startup (1-50)", "SMB (51-500)", "Enterprise (500+)", "All Sizes"] = Field(
default="All Sizes",
description="Preferred company size"
)
personalization_level: Literal["Basic", "Medium", "Deep"] = Field(
default="Deep",
description="Level of personalization"
)
class ContactInfo(BaseModel):
"""Contact information for decision makers"""
name: str = Field(..., description="Contact's full name")
title: str = Field(..., description="Job title/position")
email: Optional[str] = Field(None, description="Email address")
linkedin: Optional[str] = Field(None, description="LinkedIn profile URL")
company: str = Field(..., description="Company name")
department: Optional[str] = Field(None, description="Department")
background: Optional[str] = Field(None, description="Professional background")
class CompanyInfo(BaseModel):
"""
Stores in-depth data about a company gathered during the research phase.
"""
# Basic Information
company_name: str = Field(..., description="Company name")
website_url: str = Field(..., description="Company website URL")
# Business Details
industry: Optional[str] = Field(None, description="Primary industry")
core_business: Optional[str] = Field(None, description="Main business focus")
business_model: Optional[str] = Field(None, description="B2B, B2C, etc.")
# Marketing Information
motto: Optional[str] = Field(None, description="Company tagline/slogan")
value_proposition: Optional[str] = Field(None, description="Main value proposition")
target_audience: Optional[List[str]] = Field(
None, description="Target customer segments"
)
# Company Metrics
company_size: Optional[str] = Field(None, description="Employee count range")
founded_year: Optional[int] = Field(None, description="Year founded")
locations: Optional[List[str]] = Field(None, description="Office locations")
# Technical Details
technologies: Optional[List[str]] = Field(None, description="Technology stack")
integrations: Optional[List[str]] = Field(None, description="Software integrations")
# Market Position
competitors: Optional[List[str]] = Field(None, description="Main competitors")
unique_selling_points: Optional[List[str]] = Field(
None, description="Key differentiators"
)
market_position: Optional[str] = Field(None, description="Market positioning")
# Social Proof
customers: Optional[List[str]] = Field(None, description="Notable customers")
case_studies: Optional[List[str]] = Field(None, description="Success stories")
awards: Optional[List[str]] = Field(None, description="Awards and recognition")
# Recent Activity
recent_news: Optional[List[str]] = Field(None, description="Recent news/updates")
blog_topics: Optional[List[str]] = Field(None, description="Recent blog topics")
# Pain Points & Opportunities
challenges: Optional[List[str]] = Field(None, description="Potential pain points")
growth_areas: Optional[List[str]] = Field(None, description="Growth opportunities")
# Contact Information
email_address: Optional[str] = Field(None, description="Contact email")
phone: Optional[str] = Field(None, description="Contact phone")
social_media: Optional[Dict[str, str]] = Field(
None, description="Social media links"
)
# Additional Fields
pricing_model: Optional[str] = Field(None, description="Pricing strategy and tiers")
user_base: Optional[str] = Field(None, description="Estimated user base size")
key_features: Optional[List[str]] = Field(None, description="Main product features")
integration_ecosystem: Optional[List[str]] = Field(
None, description="Integration partners"
)
funding_status: Optional[str] = Field(
None, description="Latest funding information"
)
growth_metrics: Optional[Dict[str, str]] = Field(
None, description="Key growth indicators"
)
class PersonalisedEmailGenerator(Workflow):
"""
Automated B2B outreach system that:
1. Discovers companies using Exa search based on criteria
2. Finds contact details for decision makers at those companies
3. Researches company details and pain points
4. Generates personalized cold emails for B2B outreach
This workflow is designed to automate the entire prospecting process
from company discovery to personalized email generation.
"""
description: str = dedent("""\
AI-Powered B2B Outreach Workflow:
--------------------------------------------------------
1. Discover Target Companies (Exa Search)
2. Find Decision Maker Contacts
3. Research Company Intelligence
4. Generate Personalized Emails
--------------------------------------------------------
Fully automated prospecting pipeline for B2B outreach.
""")
company_finder: Agent = Agent(
model=OpenAIChat(id="gpt-5"),
tools=[ExaTools(api_key=os.environ["EXA_API_KEY"])],
description="Expert at finding companies that match specific criteria using web search",
instructions=dedent("""\
You are a company discovery specialist. Your job is to find companies that match the given criteria.
Search for companies based on:
- Industry/sector
- Company size
- Geographic location
- Business model
- Technology stack
- Recent funding/growth
For each company found, provide:
- Company name
- Website URL
- Brief description
- Industry
- Estimated size
- Location
Focus on finding companies that would be good prospects for the specified service offering.
Look for companies showing signs of growth, funding, or expansion.
"""),
)
contact_finder: Agent = Agent(
model=OpenAIChat(id="gpt-5"),
tools=[ExaTools(api_key=os.environ["EXA_API_KEY"])],
description="Expert at finding contact information for decision makers at companies",
instructions=dedent("""\
You are a contact research specialist. Find decision makers and their contact information.
For each company, search for:
- Key decision makers in target departments
- Their email addresses
- LinkedIn profiles
- Professional backgrounds
- Current role and responsibilities
Focus on finding people in roles like:
- CEO, CTO, VP of Engineering (for tech solutions)
- CMO, VP Marketing, Growth Lead (for marketing solutions)
- VP Sales, Sales Director (for sales solutions)
- HR Director, People Ops (for HR solutions)
Provide verified contact information when possible.
"""),
)
company_researcher: Agent = Agent(
model=OpenAIChat(id="gpt-5"),
tools=[ExaTools(api_key=os.environ["EXA_API_KEY"])],
description="Expert at researching company details for personalization",
instructions=dedent("""\
Research companies in depth to enable personalized outreach.
Analyze:
- Company website and messaging
- Recent news and updates
- Product/service offerings
- Technology stack
- Growth indicators
- Pain points and challenges
- Recent achievements
- Market position
Focus on insights that would be relevant for B2B outreach:
- Scaling challenges
- Technology needs
- Market expansion
- Competitive positioning
- Recent wins or milestones
"""),
response_model=CompanyInfo,
)
email_creator: Agent = Agent(
model=OpenAIChat(id="gpt-5"),
description=dedent("""\
You are writing for a friendly, empathetic 20-year-old sales rep whose
style is cool, concise, and respectful. Tone is casual yet professional.
- Be polite but natural, using simple language.
- Never sound robotic or use big cliché words like "delve", "synergy" or "revolutionary."
- Clearly address problems the prospect might be facing and how we solve them.
- Keep paragraphs short and friendly, with a natural voice.
- End on a warm, upbeat note, showing willingness to help.\
"""),
instructions=dedent("""\
Please craft a highly personalized email that has:
1. A simple, personal subject line referencing the problem or opportunity.
2. At least one area for improvement or highlight from research.
3. A quick explanation of how we can help them (no heavy jargon).
4. References a known challenge from the research.
5. Avoid words like "delve", "explore", "synergy", "amplify", "game changer", "revolutionary", "breakthrough".
6. Use first-person language ("I") naturally.
7. Maintain a 20-year-old's friendly style—brief and to the point.
8. Avoid placing the recipient's name in the subject line.
Use the appropriate template based on the target professional type and outreach purpose.
Ensure the final tone feels personal and conversation-like, not automatically generated.
----------------------------------------------------------------------
"""),
markdown=False,
add_datetime_to_instructions=True,
)
def get_cached_data(self, cache_key: str) -> Optional[dict]:
"""Retrieve cached data"""
logger.info(f"Checking cache for: {cache_key}")
return self.session_state.get("cache", {}).get(cache_key)
def cache_data(self, cache_key: str, data: dict):
"""Cache data"""
logger.info(f"Caching data for: {cache_key}")
self.session_state.setdefault("cache", {})
self.session_state["cache"][cache_key] = data
self.write_to_storage()
def run(
self,
config: OutreachConfig,
sender_details: Dict[str, str],
num_companies: int = 5,
use_cache: bool = True,
) -> Iterator[RunResponse]:
"""
Automated B2B outreach workflow:
1. Discover companies using Exa search based on criteria
2. Find decision maker contacts for each company
3. Research company details for personalization
4. Generate personalized emails
"""
logger.info("Starting automated B2B outreach workflow...")
# Step 1: Discover companies
logger.info("🔍 Discovering target companies...")
search_query = f"""
Find {num_companies} {config.company_category} companies that would be good prospects for {config.service_type}.
Company criteria:
- Industry: {config.company_category}
- Size: {config.company_size_preference}
- Target departments: {', '.join(config.target_departments)}
Look for companies showing growth, recent funding, or expansion.
"""
companies_response = self.company_finder.run(search_query)
if not companies_response or not companies_response.content:
logger.error("No companies found")
return
# Parse companies from response
companies_text = companies_response.content
logger.info(f"Found companies: {companies_text[:200]}...")
# Step 2: For each company, find contacts and research
for i in range(num_companies):
try:
logger.info(f"Processing company #{i+1}")
# Extract company info from the response
company_search = f"Extract company #{i+1} details from: {companies_text}"
# Step 3: Find decision maker contacts
logger.info("👥 Finding decision maker contacts...")
contacts_query = f"""
Find decision makers at company #{i+1} from this list: {companies_text}
Focus on roles in: {', '.join(config.target_departments)}
Find their email addresses and LinkedIn profiles.
"""
contacts_response = self.contact_finder.run(contacts_query)
if not contacts_response or not contacts_response.content:
logger.warning(f"No contacts found for company #{i+1}")
continue
# Step 4: Research company details
logger.info("🔬 Researching company details...")
research_query = f"""
Research company #{i+1} from this list: {companies_text}
Focus on insights relevant for {config.service_type} outreach.
Find pain points related to {', '.join(config.target_departments)}.
"""
research_response = self.company_researcher.run(research_query)
if not research_response or not research_response.content:
logger.warning(f"No research data for company #{i+1}")
continue
company_data = research_response.content
if not isinstance(company_data, CompanyInfo):
logger.warning(f"Invalid research data format for company #{i+1}")
continue
# Step 5: Generate personalized email
logger.info("✉️ Generating personalized email...")
# Get appropriate template based on target departments
template_dept = config.target_departments[0] if config.target_departments else "GTM (Sales & Marketing)"
if template_dept in DEPARTMENT_TEMPLATES and config.service_type in DEPARTMENT_TEMPLATES[template_dept]:
template = DEPARTMENT_TEMPLATES[template_dept][config.service_type]
else:
template = DEPARTMENT_TEMPLATES["GTM (Sales & Marketing)"]["Software Solution"]
email_context = json.dumps(
{
"template": template,
"company_info": company_data.model_dump(),
"contacts_info": contacts_response.content,
"sender_details": sender_details,
"target_departments": config.target_departments,
"service_type": config.service_type,
"personalization_level": config.personalization_level
},
indent=4,
)
email_response = self.email_creator.run(
f"Generate a personalized email using this context:\n{email_context}"
)
if not email_response or not email_response.content:
logger.warning(f"No email generated for company #{i+1}")
continue
yield RunResponse(content={
"company_name": company_data.company_name,
"email": email_response.content,
"company_data": company_data.model_dump(),
"contacts": contacts_response.content,
"step": f"Company {i+1}/{num_companies}"
})
except Exception as e:
logger.error(f"Error processing company #{i+1}: {e}")
continue
def create_streamlit_ui():
"""Create the Streamlit user interface"""
st.title("🚀 Automated B2B Email Outreach Generator")
st.markdown("""
**Fully automated prospecting pipeline**: Discovers companies, finds decision makers,
and generates personalized emails using AI research agents.
""")
# Step 1: Target Company Category Selection
st.header("1⃣ Target Company Discovery")
col1, col2 = st.columns([2, 1])
with col1:
selected_category = st.selectbox(
"What type of companies should we target?",
options=list(COMPANY_CATEGORIES.keys()),
key="company_category"
)
st.info(f"📌 {COMPANY_CATEGORIES[selected_category]['description']}")
st.markdown("### Typical Decision Makers We'll Find:")
for role in COMPANY_CATEGORIES[selected_category]['typical_roles']:
st.markdown(f"- {role}")
with col2:
st.markdown("### Company Size Filter")
company_size = st.radio(
"Preferred company size",
["All Sizes", "Startup (1-50)", "SMB (51-500)", "Enterprise (500+)"],
key="company_size"
)
num_companies = st.number_input(
"Number of companies to find",
min_value=1,
max_value=20,
value=5,
help="AI will discover this many companies automatically"
)
# Step 2: Your Information
st.header("2⃣ Your Contact Information")
col3, col4 = st.columns(2)
with col3:
st.subheader("Required Information")
sender_details = {
"name": st.text_input("Your Name *", key="sender_name"),
"email": st.text_input("Your Email *", key="sender_email"),
"organization": st.text_input("Your Organization *", key="sender_org")
}
with col4:
st.subheader("Optional Information")
sender_details.update({
"linkedin": st.text_input("LinkedIn Profile (optional)", key="sender_linkedin", placeholder="https://linkedin.com/in/yourname"),
"phone": st.text_input("Phone Number (optional)", key="sender_phone", placeholder="+1 (555) 123-4567"),
"website": st.text_input("Company Website (optional)", key="sender_website", placeholder="https://yourcompany.com"),
"calendar_link": st.text_input("Calendar Link (optional)", key="sender_calendar", placeholder="https://calendly.com/yourname")
})
# Service description
sender_details["service_offered"] = st.text_area(
"Describe your offering *",
height=100,
key="service_description",
help="Explain what you offer and how it helps businesses",
placeholder="We help companies build custom AI solutions that automate workflows and improve efficiency..."
)
# Step 3: Service Type and Targeting
st.header("3⃣ Outreach Configuration")
col5, col6 = st.columns(2)
with col5:
service_type = st.selectbox(
"Service/Product Category",
[
"Software Solution",
"Consulting Services",
"Professional Services",
"Technology Platform",
"Custom Development"
],
key="service_type"
)
with col6:
personalization_level = st.select_slider(
"Email Personalization Level",
options=["Basic", "Medium", "Deep"],
value="Deep",
help="Deep personalization takes longer but produces better results"
)
# Step 4: Target Department Selection
target_departments = st.multiselect(
"Which departments should we target?",
[
"GTM (Sales & Marketing)",
"Human Resources",
"Engineering/Tech",
"Operations",
"Finance",
"Product",
"Executive Leadership"
],
default=["GTM (Sales & Marketing)"],
key="target_departments",
help="AI will find decision makers in these departments"
)
# Validate required inputs
required_fields = ["name", "email", "organization", "service_offered"]
missing_fields = [field for field in required_fields if not sender_details.get(field)]
if missing_fields:
st.error(f"Please fill in required fields: {', '.join(missing_fields)}")
st.stop()
if not target_departments:
st.error("Please select at least one target department")
st.stop()
if not selected_category:
st.error("Please select a company category")
st.stop()
if not service_type:
st.error("Please select a service type")
st.stop()
# Create and return configuration
outreach_config = OutreachConfig(
company_category=selected_category,
target_departments=target_departments,
service_type=service_type,
company_size_preference=company_size,
personalization_level=personalization_level
)
return outreach_config, sender_details, num_companies
def main():
"""
Main entry point for running the automated B2B outreach workflow.
"""
try:
# Set page config must be the first Streamlit command
st.set_page_config(
page_title="Automated B2B Email Outreach",
layout="wide",
initial_sidebar_state="expanded"
)
# API Keys in Sidebar
st.sidebar.header("🔑 API Configuration")
# Update API keys from sidebar
st.session_state.EXA_API_KEY = st.sidebar.text_input(
"Exa API Key *",
value=st.session_state.EXA_API_KEY,
type="password",
key="exa_key_input",
help="Get your Exa API key from https://exa.ai"
)
st.session_state.OPENAI_API_KEY = st.sidebar.text_input(
"OpenAI API Key *",
value=st.session_state.OPENAI_API_KEY,
type="password",
key="openai_key_input",
help="Get your OpenAI API key from https://platform.openai.com"
)
# Update environment variables
os.environ["EXA_API_KEY"] = st.session_state.EXA_API_KEY
os.environ["OPENAI_API_KEY"] = st.session_state.OPENAI_API_KEY
# Validate API keys
if not st.session_state.EXA_API_KEY or not st.session_state.OPENAI_API_KEY:
st.sidebar.error("⚠️ Both API keys are required to run the application")
else:
st.sidebar.success("✅ API keys configured")
# Add guidance about API keys
st.sidebar.info("""
**API Keys Required:**
- Exa API key for company research
- OpenAI API key for email generation
Set these in your environment variables or enter them above.
""")
# Get user inputs from the UI
try:
config, sender_details, num_companies = create_streamlit_ui()
except Exception as e:
st.error(f"Configuration error: {str(e)}")
st.stop()
# Generate Emails Section
st.header("4⃣ Generate Outreach Campaign")
st.info(f"""
**Ready to launch automated prospecting:**
- Target: {config.company_category} companies ({config.company_size_preference})
- Departments: {', '.join(config.target_departments)}
- Service: {config.service_type}
- Companies to find: {num_companies}
""")
if st.button("🚀 Start Automated Campaign", key="generate_button", type="primary"):
# Check if API keys are configured
if not st.session_state.EXA_API_KEY or not st.session_state.OPENAI_API_KEY:
st.error("❌ Please configure both API keys before starting the campaign")
st.stop()
try:
# Progress tracking
progress_bar = st.progress(0)
status_text = st.empty()
results_container = st.container()
with st.spinner("Initializing AI research agents..."):
workflow = PersonalisedEmailGenerator(
session_id="streamlit-email-generator",
storage=SqliteStorage(
table_name="email_generator_workflows",
db_file="tmp/agno_workflows.db"
)
)
status_text.text("🔍 Discovering companies and generating emails...")
# Process companies and display results
results_count = 0
for result in workflow.run(
config=config,
sender_details=sender_details,
num_companies=num_companies,
use_cache=True
):
results_count += 1
progress = results_count / num_companies
progress_bar.progress(progress)
status_text.text(f"{result.content['step']} completed")
with results_container:
st.subheader(f"📧 Email for {result.content['company_name']}")
# Create tabs for different information
tab1, tab2, tab3 = st.tabs(["Generated Email", "Company Research", "Contacts Found"])
with tab1:
st.text_area(
"Personalized Email",
result.content['email'],
height=400,
key=f"email_{result.content['company_name']}_{results_count}"
)
# Copy button
if st.button(f"📋 Copy Email", key=f"copy_{result.content['company_name']}_{results_count}"):
st.success("Email content copied!")
with tab2:
st.json(result.content['company_data'])
with tab3:
st.text(result.content['contacts'])
st.markdown("---")
# Final status
if results_count > 0:
progress_bar.progress(1.0)
status_text.text(f"🎉 Campaign complete! Generated {results_count} personalized emails")
st.balloons()
else:
st.error("No emails were generated. Please try adjusting your criteria.")
except Exception as e:
st.error(f"Campaign failed: {str(e)}")
logger.error(f"Workflow failed: {e}")
st.exception(e)
st.sidebar.markdown("### About")
st.sidebar.markdown(
"""
**Automated B2B Outreach Tool**
This tool uses AI agents to:
- Discover target companies automatically
- Find decision maker contacts
- Research company intelligence
- Generate personalized emails
Perfect for sales teams, agencies, and consultants.
"""
)
except Exception as e:
logger.error(f"Workflow failed: {e}")
st.error(f"An error occurred: {str(e)}")
raise
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,4 @@
agno>=0.1.0
streamlit>=1.32.0
pydantic>=2.0.0
openai>=1.0.0

View File

@@ -1,481 +0,0 @@
# Enterprise MCP AI Agent Team
A production-grade multi-agent system built with Google ADK that orchestrates knowledge management across local files and SaaS platforms using MCP (Model Context Protocol).
## Overview
This system combines:
- **Local Filesystem MCP Server** - for accessing and analyzing local documents
- **Notion MCP Server** - for managing Notion workspaces and content
- **Composio MCP Server** - for GitHub and Figma integration
- **Intelligent Router/Orchestrator** - context-aware task delegation with state management
- **4 Specialized AI Agents** - each handling specific platform capabilities
## Architecture
```
┌─────────────────────────────────────────────────────────────┐
│ Enterprise MCP AI Agent Team │
│ (Coordinator/Dispatcher Pattern) │
├─────────────────────────────────────────────────────────────┤
│ ┌─────────────────┐ ┌─────────────────┐ ┌──────────────┐ │
│ │ File Analysis │ │ Notion Agent │ │ GitHub Agent │ │
│ │ AI Agent │ │ (Optional) │ │ (Optional) │ │
│ └─────────────────┘ └─────────────────┘ └──────────────┘ │
│ │ │ │ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌──────────────┐ │
│ │ Filesystem MCP │ │ Notion MCP │ │ Composio MCP │ │
│ │ Server │ │ Server │ │ Server │ │
│ └─────────────────┘ └─────────────────┘ └──────────────┘ │
│ │ │ │ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌──────────────┐ │
│ │ Local Documents │ │ Notion Pages & │ │ GitHub Repos │ │
│ │ (PDF, DOC, XLS) │ │ Databases │ │ & Issues │ │
│ └─────────────────┘ └─────────────────┘ └──────────────┘ │
│ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Figma Agent │ │ Composio MCP │ │
│ │ (Optional) │ │ Server │ │
│ └─────────────────┘ └─────────────────┘ │
│ │ │ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Figma Files & │ │ Figma Designs & │ │
│ │ Designs │ │ Assets │ │
│ └─────────────────┘ └─────────────────┘ │
└─────────────────────────────────────────────────────────────┘
```
### **Routing Patterns:**
1. **Coordinator/Dispatcher Pattern**: Intelligent routing based on query analysis
2. **LLM-Driven Delegation**: Automatic agent selection using `transfer_to_agent()`
3. **Explicit Invocation**: Direct agent calls using `AgentTool`
4. **Graceful Degradation**: System works with any combination of available agents
## Features
### 🔍 File Analysis Agent
- Analyzes local documents (PDFs, Word docs, spreadsheets)
- Extracts key topics, summaries, and action items
- Categorizes documents by type and content
- Identifies information for knowledge base sync
### 📝 Notion Agent
- Reads, writes, and updates Notion pages and databases
- Searches for content across Notion workspace
- Creates structured knowledge bases and documentation
- Syncs content from other sources to Notion
### 🐙 GitHub Agent
- Creates and manages GitHub issues and pull requests
- Searches repositories and code
- Manages repository content and documentation
- Sets up automated workflows and actions
### 🎨 Figma Agent
- Reads and analyzes Figma files and designs
- Exports design assets and components
- Searches for design elements and styles
- Manages design system components
### 🎯 Enterprise MCP AI Agent Team (Router/Orchestrator)
- Analyzes user requests and determines which AI agents should handle them
- Routes tasks to appropriate specialized AI agents based on capabilities
- Coordinates multi-step workflows that require multiple AI agents
- Shares context and results between AI agents through session state
- Provides comprehensive results and recommendations
### 🛡️ Error Handling & Graceful Degradation
- **MCP Server Failures**: Graceful fallback when servers are unavailable
- **Missing Environment Variables**: System works with available APIs only
- **Agent Creation Failures**: Continues with available agents
- **Validation**: Ensures at least one agent is available before operation
- **Comprehensive Logging**: Detailed logs for troubleshooting
## Prerequisites
1. **Python 3.9+** and **Node.js** (for MCP servers)
2. **Google ADK** installed and configured
3. **Notion API Key** for Notion integration
4. **Required API Keys** in environment variables
## Setup
### 1. Environment Variables
Create a `.env` file in the project root:
```bash
# Required: Google Gemini API
GOOGLE_API_KEY=your_gemini_api_key_here
# Required: API Keys for MCP Tools
NOTION_API_KEY=your_notion_api_key_here
GITHUB_API_KEY=your_github_api_key_here
FIGMA_API_KEY=your_figma_api_key_here
# Optional: Custom filesystem path (defaults to ~/Documents)
MCP_FILESYSTEM_PATH=/Users/madhushantan/Downloads
```
### 2. Notion Setup
#### Creating a Notion Integration
1. Go to [Notion Integrations](https://www.notion.so/my-integrations)
2. Click "New integration"
3. Name your integration (e.g., "Enterprise Knowledge Orchestrator")
4. Select the capabilities needed (Read & Write content)
5. Submit and copy your "Internal Integration Token"
#### Sharing Your Notion Page with the Integration
1. Open your Notion page
2. Click the three dots (⋮) in the top-right corner
3. Select "Add connections" from the dropdown
4. Search for your integration name
5. Click on your integration to add it to the page
6. Confirm by clicking "Confirm"
#### Finding Your Notion Page ID
1. Open your Notion page in a browser
2. Copy the URL: `https://www.notion.so/workspace/Your-Page-1f5b8a8ba283...`
3. The ID is the part after the last dash: `1f5b8a8ba283`
### 3. Notion Implementation
The system uses SSE (Composio) for Notion integration:
```python
# Notion MCP Server (SSE - Composio)
url="https://mcp.composio.dev/composio/server/61e41019-d05f-44d0-973e-2aef7777063a/sse?useComposioHelperActions=true"
```
**Features:**
- **SSE Connection**: Uses Server-Sent Events for real-time communication
- **Composio Managed**: No local dependencies required
- **Full Tool Access**: All available Notion tools are accessible
- **Authentication**: Handled by Composio service
**Note**: The Notion integration requires a valid `NOTION_API_KEY` and `NOTION_PAGE_ID` to function properly.
### 4. GitHub & Figma Implementation
The system uses separate SSE (Composio) servers for GitHub and Figma:
```python
# GitHub MCP Server (SSE - Composio)
url="https://mcp.composio.dev/composio/server/11fbff47-fa12-432f-8c3a-18ed4e9f66f8/sse?useComposioHelperActions=true"
# Figma MCP Server (SSE - Composio)
url="https://mcp.composio.dev/composio/server/f05e7129-7997-4c17-a654-f935278c0dfe/sse?useComposioHelperActions=true"
```
**Features:**
- **Separate Servers**: Each service has its own dedicated Composio server
- **Full Tool Access**: All available GitHub and Figma tools are accessible
- **No Local Dependencies**: Managed by Composio service
### 2. Install Dependencies
```bash
pip install -r requirements.txt
```
### 3. Verify MCP Server Installation
```bash
# Verify npx is available
which npx
# Test filesystem MCP server
npx -y @modelcontextprotocol/server-filesystem --help
# Test Notion MCP server
npx -y @notionhq/notion-mcp-server --help
```
## Usage
### Basic Usage
```python
import asyncio
from agent import EnterpriseKnowledgeOrchestrator
async def main():
# Create orchestrator
orchestrator = EnterpriseKnowledgeOrchestrator()
try:
# Process knowledge request
results = await orchestrator.process_knowledge_request(
"Analyze all PDF documents in my Documents folder and create GitHub issues for action items"
)
# Access results
print(f"Files analyzed: {len(results['file_analysis'])}")
print(f"Notion operations: {len(results['notion_operations'])}")
print(f"GitHub operations: {len(results['github_operations'])}")
print(f"Figma operations: {len(results['figma_operations'])}")
finally:
await orchestrator.close()
if __name__ == "__main__":
asyncio.run(main())
```
### Example Requests
```python
# Document analysis
"Analyze all PDF documents in my Documents folder and create a summary"
# Multi-platform operations
"Search for design components in my Figma files and create a GitHub repository for the design system"
# Notion and GitHub integration
"Read my Notion project page and create GitHub issues for all action items"
# Figma asset management
"Export design assets from Figma and organize them in a structured folder"
# Complex workflows
"Analyze quarterly reports, extract key metrics, create Notion dashboard, and set up GitHub issues for follow-ups"
```
## Agent Routing Logic
The Router/Orchestrator agent intelligently routes tasks based on query analysis:
- **File-related tasks** → FileAnalysisAgent
- **Notion-related tasks** → NotionAgent
- **GitHub-related tasks** → GitHubAgent
- **Figma-related tasks** → FigmaAgent
- **Multi-platform tasks** → Coordinate between relevant agents
## Configuration
### MCP Server URLs
The system uses these MCP servers:
```python
# Filesystem MCP Server (local)
command='npx'
args=["-y", "@modelcontextprotocol/server-filesystem", "~/Documents"]
# Notion MCP Server (SSE - Composio)
url="https://mcp.composio.dev/composio/server/61e41019-d05f-44d0-973e-2aef7777063a/sse?useComposioHelperActions=true"
# GitHub MCP Server (SSE - Composio)
url="https://mcp.composio.dev/composio/server/11fbff47-fa12-432f-8c3a-18ed4e9f66f8/sse?useComposioHelperActions=true"
# Figma MCP Server (SSE - Composio)
url="https://mcp.composio.dev/composio/server/f05e7129-7997-4c17-a654-f935278c0dfe/sse?useComposioHelperActions=true"
# No tool filtering - all available tools are accessible
```
### Custom Filesystem Path
The system now supports configurable filesystem paths through environment variables:
```bash
# Set in .env file or export in terminal
export MCP_FILESYSTEM_PATH="/path/to/your/folder"
# Examples:
export MCP_FILESYSTEM_PATH="/Users/username/Projects"
export MCP_FILESYSTEM_PATH="/home/user/documents"
export MCP_FILESYSTEM_PATH="~/Desktop/Work"
```
**Features:**
- **Flexible Paths**: Use absolute or relative paths
- **Auto-Expansion**: Tilde (~) expansion for home directory
- **Auto-Creation**: Directory created if it doesn't exist
- **Fallback**: Defaults to `~/Documents` if not specified
## Output Schemas
The system uses structured Pydantic models for consistent outputs:
### FileAnalysis
```python
{
"file_name": "quarterly_report.pdf",
"file_type": "PDF",
"summary": "Q3 financial performance analysis...",
"key_topics": ["revenue", "expenses", "growth"],
"action_items": ["Review budget allocation", "Update projections"]
}
```
### NotionOperation
```python
{
"operation_type": "read",
"page_id": "1f5b8a8ba283...",
"content_summary": "Project documentation read from Notion",
"status": "completed",
"results": {"content": "...", "blocks": [...]}
}
```
### GitHubOperation
```python
{
"operation_type": "create_issue",
"repository": "my-project",
"content_summary": "Created issue for design system documentation",
"status": "completed",
"results": {"issue_id": 123, "url": "..."}
}
```
### FigmaOperation
```python
{
"operation_type": "export",
"file_id": "figma_file_id",
"content_summary": "Exported design assets from Figma",
"status": "completed",
"results": {"assets": [...], "urls": [...]}
}
```
## Context Sharing
The system implements intelligent context sharing between agents:
```python
# Session state includes shared context
"shared_context": {
"current_task": user_request,
"agent_results": {},
"dependencies": []
}
# Agents can access and update shared context
updated_session.state["shared_context"]["agent_results"]["file_analysis"] = file_results
```
## Error Handling
The system includes comprehensive error handling:
- **MCP Connection Failures**: Graceful fallback when servers are unavailable
- **API Rate Limits**: Automatic retry logic with exponential backoff
- **Invalid Data**: Validation and sanitization of inputs
- **Session Management**: Proper cleanup of resources
## Monitoring and Logging
```python
import logging
# Configure logging level
logging.basicConfig(level=logging.INFO)
# Monitor agent activities
logger.info("File analysis completed: 5 documents processed")
logger.warning("Notion API key not found, Notion integration disabled")
logger.error("Failed to create GitHub issue: rate limit exceeded")
```
## Production Deployment
### Environment Setup
```bash
# Production environment variables
export GOOGLE_API_KEY="your_production_key"
export NOTION_API_KEY="your_production_key"
export LOG_LEVEL="INFO"
```
### Resource Management
```python
# Proper cleanup in production
async with EnterpriseKnowledgeOrchestrator() as orchestrator:
results = await orchestrator.process_knowledge_request(request)
```
### Scaling Considerations
- Use connection pooling for MCP servers
- Implement caching for frequently accessed documents
- Consider async processing for large document sets
- Monitor memory usage with large file operations
## Troubleshooting
### Common Issues
1. **MCP Server Connection Failed**
```bash
# Verify Node.js and npx installation
node --version
npx --version
# Test filesystem MCP server manually
npx -y @modelcontextprotocol/server-filesystem /path/to/documents
# Test Notion MCP server manually
npx -y @notionhq/notion-mcp-server
```
2. **Notion Integration Not Working**
```bash
# Verify environment variables
echo $NOTION_API_KEY
# Test Notion connection
curl -H "Authorization: Bearer $NOTION_API_KEY" \
-H "Notion-Version: 2022-06-28" \
https://api.notion.com/v1/users/me
```
3. **Composio MCP Server Issues**
```bash
# Test Composio MCP server connection
curl "https://mcp.composio.dev/composio/server/f05e7129-7997-4c17-a654-f935278c0dfe/sse?useComposioHelperActions=true"
```
4. **Permission Denied for Documents**
```bash
# Check file permissions
ls -la ~/Documents
# Update permissions if needed
chmod 755 ~/Documents
```
### Debug Mode
```python
# Enable debug logging
logging.basicConfig(level=logging.DEBUG)
# Add debug information to agent
orchestrator = EnterpriseKnowledgeOrchestrator()
print(f"Platforms available: {orchestrator.session_service.get_session(...).state['platforms_available']}")
```
## Contributing
1. Fork the repository
2. Create a feature branch
3. Add tests for new functionality
4. Ensure all tests pass
5. Submit a pull request
## License
This project is licensed under the MIT License - see the LICENSE file for details.
## References
- [Google ADK Documentation](https://google.github.io/adk-docs/)
- [MCP Tools Guide](https://google.github.io/adk-docs/tools/mcp-tools/)
- [Notion MCP Server](https://github.com/notionhq/notion-mcp-server)
- [Composio MCP Server](https://mcp.composio.dev/)
- [Model Context Protocol](https://modelcontextprotocol.io/)

View File

@@ -1,28 +0,0 @@
"""
Enterprise MCP AI Agent Team
A production-grade multi-agent system built with Google ADK that orchestrates
knowledge management across local files and SaaS platforms using MCP
(Model Context Protocol).
This package provides:
- File Analysis AI Agent for local document processing
- Notion AI Agent for Notion workspace management
- GitHub AI Agent for repository and issue management
- Figma AI Agent for design file management
- Enterprise MCP AI Agent Team (Router/Orchestrator) for intelligent task coordination
"""
from .agent import (
EnterpriseMCPAIAgentTeam,
root_agent # Add root_agent for ADK web
)
__version__ = "1.0.0"
__author__ = "Enterprise MCP AI Agent Team"
__description__ = "Multi-agent knowledge management system with Google ADK and MCP"
__all__ = [
"EnterpriseMCPAIAgentTeam",
"root_agent" # Export root_agent
]

View File

@@ -1,413 +0,0 @@
import os
import asyncio
import logging
from typing import Dict, List, Optional, Any
from dotenv import load_dotenv
from google.adk.agents import LlmAgent
from google.adk.tools.mcp_tool.mcp_toolset import MCPToolset, StdioServerParameters, SseServerParams
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Environment variable configuration
MCP_FILESYSTEM_PATH = os.getenv("MCP_FILESYSTEM_PATH", "~/Documents")
NOTION_API_KEY = os.getenv("NOTION_API_KEY")
GITHUB_API_KEY = os.getenv("GITHUB_API_KEY")
FIGMA_API_KEY = os.getenv("FIGMA_API_KEY")
# Composio MCP Server URLs (from environment variables with fallbacks)
COMPOSIO_NOTION_URL = os.getenv("COMPOSIO_NOTION_URL")
COMPOSIO_GITHUB_URL = os.getenv("COMPOSIO_GITHUB_URL")
COMPOSIO_FIGMA_URL = os.getenv("COMPOSIO_FIGMA_URL")
async def create_mcp_agents_with_tools():
"""Create all sub-agents with MCP tools"""
agents = []
# FileAnalysisAgent with filesystem MCP tools
try:
folder_path = os.path.expanduser(MCP_FILESYSTEM_PATH)
folder_path = os.path.abspath(folder_path)
if not os.path.exists(folder_path):
os.makedirs(folder_path, exist_ok=True)
logger.info(f"Created directory: {folder_path}")
logger.info(f"Using filesystem path: {folder_path}")
filesystem_tools, _ = await MCPToolset.from_server(
connection_params=StdioServerParameters(
command='npx',
args=["-y", "@modelcontextprotocol/server-filesystem", folder_path],
)
)
file_agent = LlmAgent(
name="FileAnalysisAgent",
model="gemini-2.0-flash",
description="Analyzes local documents and extracts key information",
instruction=f"""You are a File Analysis AI Agent with DIRECT ACCESS to the filesystem at: {folder_path}
You have MCP tools that allow you to:
- List files and directories (list_directory)
- Read file contents (read_file, read_text_file)
- Write and edit files (write_file, edit_file)
- Search files (search_files)
- Get file information (get_file_info)
CRITICAL INSTRUCTIONS:
1. You have REAL filesystem access through MCP tools
2. When users ask about files, USE YOUR TOOLS to access them directly
3. Do NOT ask users to provide files - you can access them yourself
4. Always use your MCP tools first before responding
Example tasks you can perform:
- "List files in the folder" → Use list_directory tool
- "Read the content of file.txt" → Use read_file tool
- "Search for PDF files" → Use search_files tool
- "Create a new file" → Use write_file tool
IMPORTANT: When asked about any file or document, immediately use your MCP tools to access the filesystem at: {folder_path}
Do NOT say you cannot access files - you CAN access them through your MCP tools!""",
tools=filesystem_tools
)
agents.append(file_agent)
logger.info("✅ FileAnalysisAgent with MCP tools created")
except Exception as e:
logger.error(f"❌ Failed to create FileAnalysisAgent with MCP tools: {str(e)}")
file_agent = LlmAgent(
name="FileAnalysisAgent",
model="gemini-2.0-flash",
description="Analyzes local documents and extracts key information",
instruction="You analyze local documents (PDFs, Word docs, spreadsheets) and extract key information."
)
agents.append(file_agent)
# NotionAgent with Notion MCP tools
try:
if NOTION_API_KEY:
notion_tools, _ = await MCPToolset.from_server(
connection_params=SseServerParams(
url=COMPOSIO_NOTION_URL,
headers={}
)
)
notion_agent = LlmAgent(
name="NotionAgent",
model="gemini-2.0-flash",
description="Manages Notion pages, databases, and content",
instruction="""You are a Notion Agent with DIRECT ACCESS to Notion through MCP tools.
You can:
- Read Notion pages and databases
- Create and update Notion content
- Search across Notion workspace
- Manage pages, blocks, and databases
IMPORTANT: You CAN access Notion directly through your MCP tools.
When asked to read, write, or search Notion content, USE YOUR MCP TOOLS.
Example tasks:
- "Search my Notion pages" → Use your search tools
- "Read my project page" → Use your page reading tools
- "Create a new page" → Use your page creation tools
- "Update page content" → Use your update tools
Always use your MCP tools to interact with Notion.""",
tools=notion_tools
)
agents.append(notion_agent)
logger.info("✅ NotionAgent with MCP tools created")
else:
raise Exception("NOTION_API_KEY not found")
except Exception as e:
logger.error(f"❌ Failed to create NotionAgent with MCP tools: {str(e)}")
notion_agent = LlmAgent(
name="NotionAgent",
model="gemini-2.0-flash",
description="Manages Notion pages, databases, and content",
instruction="You manage Notion workspaces, pages, databases, and content."
)
agents.append(notion_agent)
logger.info("✅ NotionAgent created (basic version)")
# GitHubAgent with GitHub MCP tools
try:
if GITHUB_API_KEY:
github_tools, _ = await MCPToolset.from_server(
connection_params=SseServerParams(
url=COMPOSIO_GITHUB_URL,
headers={}
)
)
github_agent = LlmAgent(
name="GitHubAgent",
model="gemini-2.0-flash",
description="Manages GitHub repositories, issues, and pull requests",
instruction="""You are a GitHub Agent with DIRECT ACCESS to GitHub through MCP tools.
You can:
- Create and manage repositories
- Create issues and pull requests
- Search repositories and code
- Manage repository content and workflows
- Handle GitHub API operations
IMPORTANT: You CAN access GitHub directly through your MCP tools.
When asked to perform GitHub operations, USE YOUR MCP TOOLS.
Example tasks:
- "Create a new repository" → Use your repository creation tools
- "Search for issues" → Use your search tools
- "Create a pull request" → Use your PR creation tools
- "List my repositories" → Use your repository listing tools
Always use your MCP tools to interact with GitHub.""",
tools=github_tools
)
agents.append(github_agent)
logger.info("✅ GitHubAgent with MCP tools created")
else:
raise Exception("GITHUB_API_KEY not found")
except Exception as e:
logger.error(f"❌ Failed to create GitHubAgent with MCP tools: {str(e)}")
github_agent = LlmAgent(
name="GitHubAgent",
model="gemini-2.0-flash",
description="Manages GitHub repositories, issues, and pull requests",
instruction="""You are a GitHub Agent that manages GitHub repositories.
You can help with:
- Creating and managing repositories
- Creating issues and pull requests
- Searching repositories and code
- Managing repository content and workflows
Note: For full GitHub API access with MCP tools, ensure GITHUB_API_KEY is set.
Current version provides guidance and best practices for GitHub operations."""
)
agents.append(github_agent)
logger.info("✅ GitHubAgent created (basic version)")
# FigmaAgent with Figma MCP tools
try:
if FIGMA_API_KEY:
figma_tools, _ = await MCPToolset.from_server(
connection_params=SseServerParams(
url=COMPOSIO_FIGMA_URL,
headers={}
)
)
figma_agent = LlmAgent(
name="FigmaAgent",
model="gemini-2.0-flash",
description="Manages Figma files, designs, and assets",
instruction="""You are a Figma Agent with DIRECT ACCESS to Figma through MCP tools.
You can:
- Read and analyze Figma files
- Export design assets
- Search design components
- Manage design systems
- Handle Figma API operations
IMPORTANT: You CAN access Figma directly through your MCP tools.
When asked to perform Figma operations, USE YOUR MCP TOOLS.
Example tasks:
- "Export design assets" → Use your export tools
- "Search for components" → Use your search tools
- "Read file information" → Use your file reading tools
- "List project files" → Use your file listing tools
Always use your MCP tools to interact with Figma.""",
tools=figma_tools
)
agents.append(figma_agent)
logger.info("✅ FigmaAgent with MCP tools created")
else:
raise Exception("FIGMA_API_KEY not found")
except Exception as e:
logger.error(f"❌ Failed to create FigmaAgent with MCP tools: {str(e)}")
figma_agent = LlmAgent(
name="FigmaAgent",
model="gemini-2.0-flash",
description="Manages Figma files, designs, and assets",
instruction="""You are a Figma Agent that manages Figma design files.
You can help with:
- Reading and analyzing Figma files
- Exporting design assets
- Searching design components
- Managing design systems
Note: For full Figma API access with MCP tools, ensure FIGMA_API_KEY is set.
Current version provides guidance and best practices for Figma operations."""
)
agents.append(figma_agent)
logger.info("✅ FigmaAgent created (basic version)")
return agents
class EnterpriseMCPAIAgentTeam:
"""Enterprise MCP AI Agent Team - Multi-Agent System with MCP Tools"""
def __init__(self):
"""Initialize the orchestrator"""
self.root_agent = None
self._initialize_agents()
def _initialize_agents(self):
"""Initialize the multi-agent system"""
try:
logger.info("🔧 Creating complete multi-agent system with MCP tools...")
# Create all sub-agents with MCP tools using async
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
sub_agents = loop.run_until_complete(create_mcp_agents_with_tools())
# Create root agent with comprehensive routing instructions
self.root_agent = LlmAgent(
name="EnterpriseMCPAIAgentTeam",
model="gemini-2.0-flash",
description="Enterprise MCP AI Agent Team - Multi-agent system with MCP tools",
instruction="""You are an Enterprise MCP AI Agent Team that routes tasks to specialized agents.
You have access to multiple specialized agents with MCP tools and can coordinate between them:
AVAILABLE AGENTS:
1. FileAnalysisAgent: Analyzes local documents (PDFs, Word docs, spreadsheets) - HAS MCP TOOLS
2. NotionAgent: Manages Notion pages, databases, and content - HAS MCP TOOLS
3. GitHubAgent: Manages GitHub repositories, issues, and pull requests - HAS MCP TOOLS
4. FigmaAgent: Manages Figma files, designs, and assets - HAS MCP TOOLS
ROUTING LOGIC:
- File/document tasks → FileAnalysisAgent
- Notion-related tasks → NotionAgent
- GitHub-related tasks → GitHubAgent
- Figma/design tasks → FigmaAgent
- Multi-platform tasks → Coordinate between relevant agents
You can:
1. Transfer tasks to specialized agents using transfer_to_agent()
2. Coordinate multi-step workflows
3. Share context between agents through session state
4. Provide comprehensive results and recommendations
EXAMPLES:
- "List files in Documents" → FileAnalysisAgent (with real file system access)
- "Search my Notion pages" → NotionAgent (with real Notion API access)
- "Create a GitHub repo" → GitHubAgent (with real GitHub API access)
- "Export Figma designs" → FigmaAgent (with real Figma API access)
IMPORTANT: Use transfer_to_agent() to delegate to the most appropriate agent for each task.
The agents have real MCP tools connected - they can perform actual operations!""",
sub_agents=sub_agents
)
logger.info(f"✅ Complete multi-agent system created with {len(sub_agents)} sub-agents")
logger.info(f"✅ Sub-agents: {[agent.name for agent in sub_agents]}")
except Exception as e:
logger.error(f"❌ Failed to create complete multi-agent system: {str(e)}")
logger.info("🔄 Falling back to basic multi-agent system...")
self._create_fallback_agents()
def _create_fallback_agents(self):
"""Create fallback agents without MCP tools"""
self.root_agent = LlmAgent(
name="EnterpriseMCPAIAgentTeam",
model="gemini-2.0-flash",
description="Enterprise MCP AI Agent Team - Multi-agent system",
instruction="""You are an Enterprise MCP AI Agent Team that routes tasks to specialized agents.
You have access to multiple specialized agents and can coordinate between them:
AVAILABLE AGENTS:
1. FileAnalysisAgent: Analyzes local documents (PDFs, Word docs, spreadsheets)
2. NotionAgent: Manages Notion pages, databases, and content
3. GitHubAgent: Manages GitHub repositories, issues, and pull requests
4. FigmaAgent: Manages Figma files, designs, and assets
ROUTING LOGIC:
- File/document tasks → FileAnalysisAgent
- Notion-related tasks → NotionAgent
- GitHub-related tasks → GitHubAgent
- Figma/design tasks → FigmaAgent
- Multi-platform tasks → Coordinate between relevant agents
You can:
1. Transfer tasks to specialized agents using transfer_to_agent()
2. Coordinate multi-step workflows
3. Share context between agents through session state
4. Provide comprehensive results and recommendations
EXAMPLES:
- "List files in Documents" → FileAnalysisAgent
- "Search my Notion pages" → NotionAgent
- "Create a GitHub repo" → GitHubAgent
- "Export Figma designs" → FigmaAgent
IMPORTANT: Use transfer_to_agent() to delegate to the most appropriate agent for each task.
For full MCP tool functionality, ensure all environment variables are set correctly:
- MCP_FILESYSTEM_PATH: Path to your filesystem folder
- NOTION_API_KEY: Your Notion API key
- GITHUB_API_KEY: Your GitHub API key
- FIGMA_API_KEY: Your Figma API key""",
sub_agents=[
LlmAgent(
name="FileAnalysisAgent",
model="gemini-2.0-flash",
description="Analyzes local documents and extracts key information",
instruction="You analyze local documents (PDFs, Word docs, spreadsheets) and extract key information, summaries, and action items."
),
LlmAgent(
name="NotionAgent",
model="gemini-2.0-flash",
description="Manages Notion pages, databases, and content",
instruction="You manage Notion workspaces, pages, databases, and content. You can read, write, search, and organize Notion content."
),
LlmAgent(
name="GitHubAgent",
model="gemini-2.0-flash",
description="Manages GitHub repositories, issues, and pull requests",
instruction="You manage GitHub repositories, create issues and pull requests, search code, and handle repository operations."
),
LlmAgent(
name="FigmaAgent",
model="gemini-2.0-flash",
description="Manages Figma files, designs, and assets",
instruction="You manage Figma design files, export assets, search design components, and handle design system operations."
)
]
)
# Create root_agent for ADK Web compatibility
try:
orchestrator = EnterpriseMCPAIAgentTeam()
root_agent = orchestrator.root_agent
logger.info("✅ EnterpriseMCPAIAgentTeam class and root_agent created successfully")
except Exception as e:
logger.error(f"❌ Failed to create EnterpriseMCPAIAgentTeam: {str(e)}")
# Fallback: create basic root_agent
root_agent = LlmAgent(
name="EnterpriseMCPAIAgentTeam",
model="gemini-2.0-flash",
description="Enterprise MCP AI Agent Team - Basic multi-agent system",
instruction="You are an Enterprise MCP AI Agent Team that routes tasks to specialized agents.",
sub_agents=[]
)

View File

@@ -1,32 +0,0 @@
# Google ADK and AI dependencies
google-adk>=0.1.0
google-genai>=0.3.0
# Environment and configuration
python-dotenv>=1.0.0
pydantic>=2.0.0
# Async support
asyncio-mqtt>=0.16.0
# Logging and monitoring
structlog>=23.0.0
# Data processing (optional, for advanced file analysis)
pandas>=2.0.0
numpy>=1.24.0
# File handling (optional, for document processing)
PyPDF2>=3.0.0
python-docx>=0.8.11
openpyxl>=3.1.0
# HTTP client for MCP server communication
httpx>=0.24.0
aiohttp>=3.8.0
# Development and testing
pytest>=7.0.0
pytest-asyncio>=0.21.0
black>=23.0.0
flake8>=6.0.0