Add AG2 adaptive research team example with routing and web fallback

This commit is contained in:
Vasiliy Radostev
2026-02-13 10:02:51 -08:00
parent 0d8454d991
commit 3c8138bb4f
8 changed files with 461 additions and 0 deletions

View File

@@ -138,6 +138,7 @@ A curated collection of **Awesome LLM apps built with RAG, AI Agents, Multi-agen
* [🧲 AI Competitor Intelligence Agent Team](advanced_ai_agents/multi_agent_apps/agent_teams/ai_competitor_intelligence_agent_team/)
* [💲 AI Finance Agent Team](advanced_ai_agents/multi_agent_apps/agent_teams/ai_finance_agent_team/)
* [🎨 AI Game Design Agent Team](advanced_ai_agents/multi_agent_apps/agent_teams/ai_game_design_agent_team/)
* [🧭 AG2 Adaptive Research Team](advanced_ai_agents/multi_agent_apps/agent_teams/ag2_adaptive_research_team/)
* [👨‍⚖️ AI Legal Agent Team (Cloud & Local)](advanced_ai_agents/multi_agent_apps/agent_teams/ai_legal_agent_team/)
* [💼 AI Recruitment Agent Team](advanced_ai_agents/multi_agent_apps/agent_teams/ai_recruitment_agent_team/)
* [🏠 AI Real Estate Agent Team](advanced_ai_agents/multi_agent_apps/agent_teams/ai_real_estate_agent_team)

View File

@@ -0,0 +1,52 @@
# AG2 Adaptive Research Team
A Streamlit app that blends agent teamwork with agent-enabled routing and fallback, built entirely on AG2.
## What This Shows
- **Agent teamwork**: explicit roles and sequential handoffs
- **Agent-enabled routing**: a clear decision step with local-doc vs web fallback
- **AG2-first implementation**: no Microsoft AutoGen dependency; installs via `ag2[openai]`
## Features
- Local document upload (PDF, TXT, MD)
- Routing decision based on document coverage
- Optional web fallback via SearxNG
- Verifier step to check evidence sufficiency
- Final synthesis with citations
## How To Run
1. Install dependencies:
```bash
pip install -r requirements.txt
```
2. Run the app:
```bash
streamlit run app.py
```
3. Provide your OpenAI API key in the sidebar and ask a question.
## How It Works
1. **Triage Agent** decides whether the question should be answered from local docs or the web.
2. **Local/Web Research Agent** collects evidence.
3. **Verifier Agent** checks evidence strength.
4. **Synthesizer Agent** produces the final answer with citations.
## Optional Add-ons (AG2 0.11)
- **AG-UI protocol integration** for richer UI rendering
- **OpenTelemetry tracing** for debugging multi-agent workflows
These are optional and not required to run this example.
## Notes
- Web fallback uses the SearxNG public instance at `https://searxng.site/search`.
- Default model is set to `gpt-5-nano`, following AG2 documentation examples.

View File

@@ -0,0 +1,85 @@
from __future__ import annotations
from typing import Any, Dict, Optional
from autogen import AssistantAgent
DEFAULT_MODEL = "gpt-5-nano"
def make_llm_config(api_key: str, model: str = DEFAULT_MODEL, temperature: float = 0.2) -> Dict[str, Any]:
return {
"config_list": [
{
"api_type": "openai",
"model": model,
"api_key": api_key,
}
],
"temperature": temperature,
}
def build_agents(api_key: str, model: str = DEFAULT_MODEL) -> Dict[str, AssistantAgent]:
llm_config = make_llm_config(api_key=api_key, model=model)
triage_agent = AssistantAgent(
name="triage_agent",
llm_config=llm_config,
system_message=(
"You are a triage agent for a research team. "
"Classify whether the question can be answered from local documents or needs web research. "
"Respond ONLY with JSON."
),
)
local_research_agent = AssistantAgent(
name="local_research_agent",
llm_config=llm_config,
system_message=(
"You are a local research agent. Use only the provided document excerpts. "
"Return JSON with evidence and a draft answer."
),
)
web_research_agent = AssistantAgent(
name="web_research_agent",
llm_config=llm_config,
system_message=(
"You are a web research agent. Use the provided web search results only. "
"Return JSON with evidence and a draft answer."
),
)
verifier_agent = AssistantAgent(
name="verifier_agent",
llm_config=llm_config,
system_message=(
"You are a verifier. Check evidence sufficiency and identify gaps. "
"Return JSON verdict and gaps."
),
)
synthesizer_agent = AssistantAgent(
name="synthesizer_agent",
llm_config=llm_config,
system_message=(
"You are the final synthesizer. Produce a clear answer with citations to the evidence."
),
)
return {
"triage": triage_agent,
"local": local_research_agent,
"web": web_research_agent,
"verifier": verifier_agent,
"synthesizer": synthesizer_agent,
}
def run_agent(agent: AssistantAgent, prompt: str) -> str:
reply = agent.generate_reply(messages=[{"role": "user", "content": prompt}])
if isinstance(reply, dict):
return reply.get("content", "") or ""
return str(reply)

View File

@@ -0,0 +1,69 @@
import os
import streamlit as st
from agents import DEFAULT_MODEL
from router import run_pipeline
from tools import build_local_index, load_documents
SEARXNG_BASE_URL = "https://searxng.site/search"
st.set_page_config(page_title="AG2 Adaptive Research Team", layout="wide")
st.title("AG2 Adaptive Research Team")
st.caption("Agent teamwork + agent-enabled routing, built with AG2")
with st.sidebar:
st.header("API Configuration")
api_key = st.text_input("OpenAI API Key", type="password")
model = st.text_input("Model", value=DEFAULT_MODEL)
web_enabled = st.toggle("Enable Web Fallback", value=True)
st.markdown("Web fallback uses SearxNG.")
st.subheader("1. Upload Local Documents")
files = st.file_uploader(
"Upload PDFs or text files",
type=["pdf", "txt", "md"],
accept_multiple_files=True,
)
st.subheader("2. Ask a Question")
question = st.text_area("Research question")
run_clicked = st.button("Run Research")
if run_clicked:
if not api_key:
st.error("Please provide your OpenAI API key.")
st.stop()
if not question.strip():
st.error("Please enter a research question.")
st.stop()
os.environ["OPENAI_API_KEY"] = api_key
documents = load_documents(files or [])
local_index = build_local_index(documents)
with st.spinner("Running the AG2 team..."):
result = run_pipeline(
question=question,
local_chunks=local_index,
api_key=api_key,
model=model,
web_enabled=web_enabled,
searxng_base_url=SEARXNG_BASE_URL,
)
st.subheader("Routing Decision")
st.json(result.get("triage", {}))
st.subheader("Evidence")
st.json(result.get("evidence", []))
st.subheader("Verifier")
st.json(result.get("verifier", {}))
st.subheader("Final Answer")
st.markdown(result.get("final_answer", ""))

View File

@@ -0,0 +1,3 @@
ag2[openai]>=0.11.0
streamlit>=1.33.0
pypdf>=4.2.0

View File

@@ -0,0 +1,141 @@
from __future__ import annotations
import json
import re
from typing import Any, Dict, List
from agents import build_agents, run_agent
from tools import Chunk, run_searxng, search_local
def _extract_json(text: str) -> Dict[str, Any]:
match = re.search(r"\{.*\}", text, re.DOTALL)
if not match:
return {}
try:
return json.loads(match.group(0))
except json.JSONDecodeError:
return {}
def _summarize_chunks(chunks: List[Chunk]) -> str:
lines = []
for chunk in chunks:
snippet = chunk.text[:300].strip()
lines.append(f"- {chunk.doc_name} [chunk {chunk.chunk_id}]: {snippet}")
return "\n".join(lines)
def run_pipeline(
question: str,
local_chunks: List[Chunk],
api_key: str,
model: str,
web_enabled: bool,
searxng_base_url: str,
) -> Dict[str, Any]:
agents = build_agents(api_key=api_key, model=model)
doc_summary = "No local documents provided."
if local_chunks:
doc_names = sorted({chunk.doc_name for chunk in local_chunks})
doc_summary = f"Local docs: {', '.join(doc_names)} (total chunks: {len(local_chunks)})"
triage_prompt = f"""
Question: {question}
{doc_summary}
Decide the best route. Output JSON with keys:
- route: "local" or "web"
- confidence: number 0 to 1
- rationale: short string
"""
triage_raw = run_agent(agents["triage"], triage_prompt)
triage = _extract_json(triage_raw)
route = triage.get("route", "local")
if not local_chunks and web_enabled:
route = "web"
evidence: List[Dict[str, Any]] = []
draft_answer = ""
if route == "web" and web_enabled:
search_results = run_searxng(question, base_url=searxng_base_url, max_results=5)
formatted_results = "\n".join(
[
f"- {item.get('title', 'Untitled')} | {item.get('link', '')} | {item.get('snippet', '')}"
for item in search_results
]
)
web_prompt = f"""
Question: {question}
Web results:
{formatted_results}
Return JSON with keys:
- evidence: list of {{source, summary}}
- draft_answer: string
"""
web_raw = run_agent(agents["web"], web_prompt)
web_json = _extract_json(web_raw)
evidence = web_json.get("evidence", [])
draft_answer = web_json.get("draft_answer", "")
else:
hits = search_local(question, local_chunks, top_k=5)
formatted_hits = _summarize_chunks(hits)
local_prompt = f"""
Question: {question}
Document excerpts:
{formatted_hits}
Return JSON with keys:
- evidence: list of {{source, summary}}
- draft_answer: string
"""
local_raw = run_agent(agents["local"], local_prompt)
local_json = _extract_json(local_raw)
evidence = local_json.get("evidence", [])
draft_answer = local_json.get("draft_answer", "")
verifier_prompt = f"""
Question: {question}
Draft answer:
{draft_answer}
Evidence:
{json.dumps(evidence, indent=2)}
Return JSON with keys:
- verdict: "sufficient" or "insufficient"
- gaps: list of short strings
"""
verifier_raw = run_agent(agents["verifier"], verifier_prompt)
verifier = _extract_json(verifier_raw)
synth_prompt = f"""
Question: {question}
Draft answer:
{draft_answer}
Evidence:
{json.dumps(evidence, indent=2)}
Verifier verdict:
{json.dumps(verifier, indent=2)}
Provide the final answer with clear citations to the evidence sources.
"""
final_answer = run_agent(agents["synthesizer"], synth_prompt)
return {
"route": route,
"triage": triage,
"evidence": evidence,
"verifier": verifier,
"final_answer": final_answer,
}

View File

@@ -0,0 +1,5 @@
AG2 Adaptive Research Team Sample Notes
This folder can hold local documents that the app can search. Try asking:
- "What does this sample note describe?"
- "Summarize the purpose of the AG2 Adaptive Research Team sample docs."

View File

@@ -0,0 +1,105 @@
from __future__ import annotations
import re
from dataclasses import dataclass
from typing import Iterable, List
from pypdf import PdfReader
from autogen.tools.experimental import SearxngSearchTool
@dataclass
class Document:
name: str
text: str
@dataclass
class Chunk:
doc_name: str
chunk_id: int
text: str
def _clean_text(text: str) -> str:
return re.sub(r"\s+", " ", text).strip()
def _tokenize(text: str) -> List[str]:
return re.findall(r"[a-zA-Z0-9]+", text.lower())
def load_documents(uploaded_files: Iterable) -> List[Document]:
documents: List[Document] = []
for file in uploaded_files:
name = file.name
if name.lower().endswith(".pdf"):
reader = PdfReader(file)
pages_text = []
for page in reader.pages:
pages_text.append(page.extract_text() or "")
text = _clean_text("\n".join(pages_text))
else:
raw = file.read()
try:
text = raw.decode("utf-8")
except UnicodeDecodeError:
text = raw.decode("latin-1")
text = _clean_text(text)
if text:
documents.append(Document(name=name, text=text))
return documents
def chunk_text(text: str, chunk_size: int = 800, overlap: int = 120) -> List[str]:
words = text.split()
if not words:
return []
chunks: List[str] = []
start = 0
while start < len(words):
end = min(len(words), start + chunk_size)
chunk = " ".join(words[start:end])
chunks.append(chunk)
if end == len(words):
break
start = max(0, end - overlap)
return chunks
def build_local_index(documents: List[Document]) -> List[Chunk]:
index: List[Chunk] = []
for doc in documents:
chunks = chunk_text(doc.text)
for idx, chunk in enumerate(chunks, start=1):
index.append(Chunk(doc_name=doc.name, chunk_id=idx, text=chunk))
return index
def search_local(query: str, index: List[Chunk], top_k: int = 5) -> List[Chunk]:
query_tokens = set(_tokenize(query))
scored = []
for chunk in index:
chunk_tokens = set(_tokenize(chunk.text))
overlap = len(query_tokens & chunk_tokens)
if overlap == 0:
continue
scored.append((overlap, chunk))
scored.sort(key=lambda item: item[0], reverse=True)
return [item[1] for item in scored[:top_k]]
def run_searxng(query: str, base_url: str, max_results: int = 5) -> List[dict]:
tool = SearxngSearchTool(base_url=base_url)
results = tool(query=query, max_results=max_results)
if isinstance(results, dict):
return results.get("results", [])
if isinstance(results, list):
return results
return []