Merge pull request #460 from STiFLeR7/feat/devpulse-ai-reference

feat(devpulse_ai): add multi-agent signal intelligence reference implementation
This commit is contained in:
Shubham Saboo
2026-02-24 21:20:00 -08:00
committed by GitHub
18 changed files with 1983 additions and 0 deletions

View File

@@ -0,0 +1,4 @@
# Python bytecode
__pycache__/
*.pyc
*.pyo

View File

@@ -0,0 +1,195 @@
# 🧠 DevPulseAI — Multi-Agent Signal Intelligence
A reference implementation demonstrating how to build a **multi-agent pipeline** that aggregates technical signals from multiple sources, scores them for relevance, assesses risks, and synthesizes an actionable intelligence digest.
> **Design Philosophy:** Agents are used **only where reasoning is required.** Deterministic operations (collection, normalization, deduplication) are implemented as plain utilities — not agents.
---
## Architecture
```
┌─────────────────────────────────────────────────────────┐
│ DATA SOURCES │
│ GitHub · ArXiv · HackerNews · Medium · HuggingFace │
└──────────────────────┬──────────────────────────────────┘
│ raw signals
┌──────────────────────────────────────────────────────────┐
│ SignalCollector (UTILITY — no LLM) │
│ • Normalizes to unified schema │
│ • Deduplicates via source:id composite key │
│ • Filters incomplete signals │
└──────────────────────┬───────────────────────────────────┘
│ normalized signals
┌──────────────────────────────────────────────────────────┐
│ RelevanceAgent (AGENT — gpt-4.1-mini) │
│ • Scores each signal 0100 for developer relevance │
│ • Considers: novelty, impact, actionability, timeliness │
│ • Falls back to heuristics if no API key │
└──────────────────────┬───────────────────────────────────┘
│ scored signals
┌──────────────────────────────────────────────────────────┐
│ RiskAgent (AGENT — gpt-4.1-mini) │
│ • Assesses security vulnerabilities │
│ • Flags breaking changes and deprecations │
│ • Rates risk: LOW / MEDIUM / HIGH / CRITICAL │
└──────────────────────┬───────────────────────────────────┘
│ risk-assessed signals
┌──────────────────────────────────────────────────────────┐
│ SynthesisAgent (AGENT — gpt-4.1) │
│ • Cross-references relevance + risk data │
│ • Produces executive summary │
│ • Generates actionable recommendations │
└──────────────────────┬───────────────────────────────────┘
📄 Intelligence Digest
```
---
## Why Signal Collection Is Not an Agent
This is an **intentional, opinionated design choice** — not a shortcut.
Signal collection involves:
- Fetching data from HTTP APIs (deterministic)
- Normalizing fields to a unified schema (mechanical transformation)
- Deduplicating by composite key (hash comparison)
**None of these tasks require reasoning, judgment, or language understanding.**
Wrapping collection in an `Agent` class would be _decorative_ — it would have an LLM import that never gets called. This misleads readers into thinking an LLM is necessary, when the actual logic is a `for` loop with a `set()`.
> **Rule of thumb:** If you can write the logic as a pure function with no ambiguity, it's a utility. If the output depends on understanding context, making judgment calls, or generating natural language, it's an agent.
---
## Agent Roles & Model Selection
| Component | Type | Model | Why This Model |
|---|---|---|---|
| `SignalCollector` | **Utility** | _none_ | Deterministic — no reasoning required |
| `RelevanceAgent` | **Agent** | `gpt-4.1-mini` | Classification task — fast, cheap, high-volume |
| `RiskAgent` | **Agent** | `gpt-4.1-mini` | Structured analysis — careful but not expensive |
| `SynthesisAgent` | **Agent** | `gpt-4.1` | Cross-referencing & summarization — needs strongest reasoning |
**Single provider by default (OpenAI)** to reduce onboarding friction. Override per-agent via environment variables:
```bash
export MODEL_RELEVANCE=gpt-4.1-nano # cheaper, faster
export MODEL_RISK=o4-mini # deeper reasoning for risk
export MODEL_SYNTHESIS=gpt-4.1 # default, strongest
```
---
## How to Run
### Quick Verification (No API Key Required)
```bash
cd advanced_ai_agents/multi_agent_apps/devpulse_ai
python verify.py
```
This runs the full pipeline with mock data in **<1 second**. No network calls, no API keys.
Expected output:
```
[OK] DevPulseAI reference pipeline executed successfully
```
### Full Pipeline (With API Key)
```bash
pip install -r requirements.txt
export OPENAI_API_KEY=sk-...
python main.py
```
Without an API key, agents automatically fall back to heuristic scoring.
### Streamlit Dashboard
```bash
streamlit run streamlit_app.py
```
---
## Project Structure
```
devpulse_ai/
├── agents/
│ ├── __init__.py # Package exports + design docs
│ ├── signal_collector.py # UTILITY — normalize & dedup
│ ├── relevance_agent.py # AGENT — score relevance (gpt-4.1-mini)
│ ├── risk_agent.py # AGENT — assess risks (gpt-4.1-mini)
│ └── synthesis_agent.py # AGENT — produce digest (gpt-4.1)
├── adapters/
│ ├── github.py # GitHub trending repos
│ ├── arxiv.py # ArXiv recent papers
│ ├── hackernews.py # HackerNews top stories
│ ├── medium.py # Medium AI/ML blogs
│ └── huggingface.py # HuggingFace trending models
├── workflows/
│ └── signal-intelligence-pipeline.json
├── main.py # Full pipeline runner
├── verify.py # Mock-data verification (<1s)
├── streamlit_app.py # Interactive dashboard
└── requirements.txt # Minimal deps (single provider)
```
---
## Optional Extensions (Advanced Users)
These are **not required** for the reference implementation, but show how the architecture extends:
1. **Multi-provider models** — Swap `RelevanceAgent` to use Anthropic Claude or Google Gemini by updating the model config. The `agno` framework supports multiple providers.
2. **Vector search** — Add a Pinecone or Qdrant adapter to store and retrieve signals semantically for long-term pattern detection.
3. **Streaming digests** — Use WebSocket streaming from `SynthesisAgent` for real-time intelligence feeds.
4. **Custom adapters** — Add new signal sources by implementing a `fetch_*` function that returns `List[Dict]` with the standard schema (`id`, `source`, `title`, `description`, `url`, `metadata`).
5. **Feedback loop** — Store user feedback (👍/👎) in Supabase and use it to fine-tune relevance scoring over time.
---
## Dependencies
```
agno # Agent framework
openai # LLM provider (single default)
httpx # HTTP client for adapters
feedparser # RSS/Atom parsing for Medium
streamlit>=1.30 # Interactive dashboard
```
No `google-generativeai` required. Gemini is an optional extension if users want multi-provider support — install `google-genai` (not the deprecated `google-generativeai`) separately.
---
## Design Tradeoffs
| Decision | Tradeoff | Why |
|---|---|---|
| Single provider default | Less flexibility | Reduces onboarding from 2+ keys to 1 |
| Signal collection as utility | Less "agentic" demo | Honest architecture — agents where reasoning exists |
| Heuristic fallbacks | Lower quality without API key | Pipeline always works, even for evaluation |
| 5 signals per source default | Less data | Keeps demo fast (<10s with API, <1s mock) |
| No async in agents | Less throughput | Simpler code, clearer educational value |
---
_Built as a reference implementation for [awesome-llm-apps](https://github.com/Shubhamsaboo/awesome-llm-apps)._

View File

@@ -0,0 +1,86 @@
"""
ArXiv Adapter - Fetches recent AI/ML research papers.
This is a simplified, stateless adapter for the DevPulseAI reference implementation.
ArXiv API is public and requires no authentication.
"""
import httpx
import xml.etree.ElementTree as ET
from typing import List, Dict, Any
def fetch_arxiv_papers(limit: int = 5) -> List[Dict[str, Any]]:
"""
Fetch recent AI/ML papers from ArXiv.
Args:
limit: Maximum number of papers to return.
Returns:
List of signal dictionaries with standardized schema.
"""
base_url = "https://export.arxiv.org/api/query"
params = {
"search_query": "cat:cs.AI OR cat:cs.LG",
"start": 0,
"max_results": limit,
"sortBy": "submittedDate",
"sortOrder": "descending"
}
signals = []
try:
response = httpx.get(base_url, params=params, timeout=15.0)
response.raise_for_status()
# Parse Atom XML response
root = ET.fromstring(response.content)
ns = {"atom": "http://www.w3.org/2005/Atom"}
for entry in root.findall("atom:entry", ns):
title_elem = entry.find("atom:title", ns)
summary_elem = entry.find("atom:summary", ns)
id_elem = entry.find("atom:id", ns)
published_elem = entry.find("atom:published", ns)
title = title_elem.text.strip() if title_elem is not None else "Untitled"
summary = summary_elem.text.strip() if summary_elem is not None else ""
arxiv_id = id_elem.text.strip() if id_elem is not None else ""
published = published_elem.text if published_elem is not None else ""
# Get PDF link
pdf_link = arxiv_id
link_elem = entry.find("atom:link[@title='pdf']", ns)
if link_elem is not None:
pdf_link = link_elem.attrib.get("href", arxiv_id)
signal = {
"id": arxiv_id,
"source": "arxiv",
"title": title,
"description": summary[:500] + "..." if len(summary) > 500 else summary,
"url": arxiv_id,
"metadata": {
"pdf": pdf_link,
"published": published
}
}
signals.append(signal)
except httpx.HTTPError as e:
print(f"[ArXiv Adapter] HTTP error: {e}")
except ET.ParseError as e:
print(f"[ArXiv Adapter] XML parse error: {e}")
except Exception as e:
print(f"[ArXiv Adapter] Error: {e}")
return signals
if __name__ == "__main__":
# Quick test
results = fetch_arxiv_papers(limit=3)
for r in results:
print(f"- {r['title'][:60]}...")

View File

@@ -0,0 +1,65 @@
"""
GitHub Adapter - Fetches trending repositories from GitHub.
This is a simplified, stateless adapter for the DevPulseAI reference implementation.
No authentication required for basic public API access.
"""
import httpx
from datetime import datetime, timedelta
from typing import List, Dict, Any
def fetch_github_trending(limit: int = 5) -> List[Dict[str, Any]]:
"""
Fetch trending GitHub repositories created in the last 24 hours.
Args:
limit: Maximum number of repositories to return.
Returns:
List of signal dictionaries with standardized schema.
"""
base_url = "https://api.github.com/search/repositories"
date_query = (datetime.utcnow() - timedelta(days=1)).strftime("%Y-%m-%d")
params = {
"q": f"created:>{date_query} sort:stars",
"per_page": limit
}
signals = []
try:
response = httpx.get(base_url, params=params, timeout=10.0)
response.raise_for_status()
data = response.json()
for item in data.get("items", []):
signal = {
"id": str(item["id"]),
"source": "github",
"title": item["full_name"],
"description": item.get("description") or "No description",
"url": item["html_url"],
"metadata": {
"stars": item["stargazers_count"],
"language": item.get("language"),
"topics": item.get("topics", [])
}
}
signals.append(signal)
except httpx.HTTPError as e:
print(f"[GitHub Adapter] HTTP error: {e}")
except Exception as e:
print(f"[GitHub Adapter] Error: {e}")
return signals
if __name__ == "__main__":
# Quick test
results = fetch_github_trending(limit=3)
for r in results:
print(f"- {r['title']}: {r['metadata']['stars']} stars")

View File

@@ -0,0 +1,72 @@
"""
HackerNews Adapter - Fetches top AI/ML stories from HackerNews.
This is a simplified, stateless adapter for the DevPulseAI reference implementation.
Uses the Algolia HN API for better search capabilities.
"""
import httpx
from typing import List, Dict, Any
def fetch_hackernews_stories(limit: int = 5) -> List[Dict[str, Any]]:
"""
Fetch recent AI/ML related stories from HackerNews.
Args:
limit: Maximum number of stories to return.
Returns:
List of signal dictionaries with standardized schema.
"""
base_url = "https://hn.algolia.com/api/v1/search_by_date"
params = {
"query": "AI OR LLM OR Machine Learning OR GPT",
"tags": "story",
"hitsPerPage": limit,
"numericFilters": "points>5"
}
signals = []
try:
response = httpx.get(base_url, params=params, timeout=10.0)
response.raise_for_status()
data = response.json()
for hit in data.get("hits", []):
# Skip stories without URLs (Ask HN, etc.)
if not hit.get("url") and not hit.get("story_text"):
continue
external_id = str(hit.get("objectID", ""))
hn_url = f"https://news.ycombinator.com/item?id={external_id}"
signal = {
"id": external_id,
"source": "hackernews",
"title": hit.get("title", "Untitled"),
"description": hit.get("story_text", "")[:300] if hit.get("story_text") else "",
"url": hit.get("url") or hn_url,
"metadata": {
"points": hit.get("points", 0),
"comments": hit.get("num_comments", 0),
"author": hit.get("author", "unknown"),
"hn_url": hn_url
}
}
signals.append(signal)
except httpx.HTTPError as e:
print(f"[HackerNews Adapter] HTTP error: {e}")
except Exception as e:
print(f"[HackerNews Adapter] Error: {e}")
return signals
if __name__ == "__main__":
# Quick test
results = fetch_hackernews_stories(limit=3)
for r in results:
print(f"- {r['title']}: {r['metadata']['points']} points")

View File

@@ -0,0 +1,80 @@
"""
HuggingFace Adapter - Fetches trending models from HuggingFace Hub.
This is a simplified, stateless adapter for the DevPulseAI reference implementation.
Uses the public HuggingFace API (no authentication required for basic access).
"""
import httpx
from typing import List, Dict, Any
def fetch_huggingface_models(limit: int = 5) -> List[Dict[str, Any]]:
"""
Fetch trending/popular models from HuggingFace Hub.
Args:
limit: Maximum number of models to return.
Returns:
List of signal dictionaries with standardized schema.
"""
base_url = "https://huggingface.co/api/models"
params = {
"sort": "likes",
"direction": "-1",
"limit": limit
}
signals = []
try:
response = httpx.get(base_url, params=params, timeout=10.0)
response.raise_for_status()
data = response.json()
for item in data:
model_id = item.get("modelId", item.get("id", "unknown"))
# Build description from model metadata
tags = item.get("tags", [])
pipeline = item.get("pipeline_tag", "")
description_parts = []
if pipeline:
description_parts.append(f"Pipeline: {pipeline}")
if tags:
description_parts.append(f"Tags: {', '.join(tags[:5])}")
description_parts.append(f"Downloads: {item.get('downloads', 0):,}")
description_parts.append(f"Likes: {item.get('likes', 0):,}")
signal = {
"id": model_id,
"source": "huggingface",
"title": f"HF Model: {model_id}",
"description": " | ".join(description_parts),
"url": f"https://huggingface.co/{model_id}",
"metadata": {
"downloads": item.get("downloads", 0),
"likes": item.get("likes", 0),
"pipeline_tag": pipeline,
"tags": tags[:10],
"author": item.get("author", "")
}
}
signals.append(signal)
except httpx.HTTPError as e:
print(f"[HuggingFace Adapter] HTTP error: {e}")
except Exception as e:
print(f"[HuggingFace Adapter] Error: {e}")
return signals
if __name__ == "__main__":
# Quick test
results = fetch_huggingface_models(limit=3)
for r in results:
print(f"- {r['title']}: {r['metadata']['likes']} likes")

View File

@@ -0,0 +1,70 @@
"""
Medium Adapter - Fetches tech blogs from Medium and other RSS feeds.
This is a simplified, stateless adapter for the DevPulseAI reference implementation.
Uses feedparser to fetch from RSS/Atom feeds.
"""
import feedparser
from typing import List, Dict, Any
# Tech blog feeds to monitor
FEEDS = [
"https://medium.com/feed/tag/artificial-intelligence",
"https://medium.com/feed/tag/machine-learning",
"https://medium.com/feed/@netflixtechblog",
"https://engineering.fb.com/feed/",
]
def fetch_medium_blogs(limit: int = 5) -> List[Dict[str, Any]]:
"""
Fetch recent tech blogs from Medium and engineering blogs.
Args:
limit: Maximum number of entries per feed.
Returns:
List of signal dictionaries with standardized schema.
"""
signals = []
for feed_url in FEEDS:
try:
feed = feedparser.parse(feed_url)
for entry in feed.entries[:limit]:
# Get summary or description
summary = getattr(entry, "summary", "") or getattr(entry, "description", "")
# Clean HTML tags from summary (simple approach)
if summary:
import re
summary = re.sub(r'<[^>]+>', '', summary)[:500]
signal = {
"id": entry.get("id", entry.link),
"source": "medium",
"title": entry.title,
"description": summary,
"url": entry.link,
"metadata": {
"published": getattr(entry, "published", ""),
"author": getattr(entry, "author", "Unknown"),
"feed": feed_url
}
}
signals.append(signal)
except Exception as e:
print(f"[Medium Adapter] Error fetching {feed_url}: {e}")
return signals
if __name__ == "__main__":
# Quick test
results = fetch_medium_blogs(limit=2)
for r in results:
print(f"- {r['title'][:60]}...")

View File

@@ -0,0 +1,33 @@
"""
DevPulseAI Agents Package
This package contains the intelligence pipeline components:
- SignalCollector: Pure utility (NOT an agent) — normalizes and deduplicates signals.
Signal collection is deterministic and intentionally not agent-driven.
- RelevanceAgent: LLM agent — scores signals 0-100 for developer relevance.
Uses fast model (gpt-4.1-mini) for high-throughput classification.
- RiskAgent: LLM agent — assesses security risks and breaking changes.
Uses structured-reasoning model (gpt-4.1-mini) for careful analysis.
- SynthesisAgent: LLM agent — produces final intelligence digest.
Uses strongest model (gpt-4.1) for cross-referencing and summarization.
Design Principle:
Agents are used ONLY where reasoning is required.
Deterministic operations (collection, normalization, dedup) are utilities.
"""
from .signal_collector import SignalCollector
from .relevance_agent import RelevanceAgent
from .risk_agent import RiskAgent
from .synthesis_agent import SynthesisAgent
__all__ = [
"SignalCollector",
"RelevanceAgent",
"RiskAgent",
"SynthesisAgent",
]

View File

@@ -0,0 +1,124 @@
"""
Relevance Agent — Scores signals by developer relevance (0100).
This agent uses LLM reasoning to evaluate each signal's importance to
AI/ML developers. It's a legitimate agent because relevance scoring
requires judgment, context understanding, and nuanced assessment that
pure heuristics cannot capture.
Model Selection:
Uses a fast, cost-efficient model (gpt-4.1-mini by default) because
relevance scoring is high-volume and doesn't require deep reasoning —
it's a classification task, not a synthesis task.
"""
import json
from typing import Dict, Any, List
from agno.agent import Agent
from agno.models.openai import OpenAIChat
# Central model config — override via MODEL_RELEVANCE env var
import os
DEFAULT_MODEL = os.environ.get("MODEL_RELEVANCE", "gpt-4.1-mini")
class RelevanceAgent:
"""
Agent that scores signals based on relevance to developers.
Why this IS an agent (unlike SignalCollector):
Relevance scoring requires understanding context, assessing novelty,
and making judgment calls about what matters to developers. This is
inherently a reasoning task that benefits from LLM capabilities.
Responsibilities:
- Score signals 0100 based on developer relevance
- Provide reasoning for each score
- Gracefully fall back to heuristics when LLM unavailable
"""
def __init__(self, model_id: str = None):
"""
Initialize the Relevance Agent.
Args:
model_id: OpenAI model to use. Defaults to gpt-4.1-mini (fast, cheap).
"""
self.model_id = model_id or DEFAULT_MODEL
self.agent = Agent(
name="Relevance Scorer",
model=OpenAIChat(id=self.model_id),
role="Scores technical signals based on developer relevance",
instructions=[
"Score each signal from 0-100 based on relevance.",
"Consider: novelty, impact, actionability, and timeliness.",
"Prioritize signals relevant to AI/ML engineers.",
"Provide brief reasoning for each score.",
],
markdown=True,
)
def score(self, signal: Dict[str, Any]) -> Dict[str, Any]:
"""
Score a single signal for developer relevance.
Attempts LLM scoring first, falls back to heuristics on failure.
"""
prompt = f"""Rate the relevance of this signal for AI/ML developers.
Score from 0-100 where:
- 0-30: Low relevance (noise, off-topic)
- 31-60: Moderate relevance (interesting but not urgent)
- 61-80: High relevance (important for developers to know)
- 81-100: Critical relevance (must-know, actionable)
Signal:
- Source: {signal.get('source', 'unknown')}
- Title: {signal.get('title', 'Untitled')}
- Description: {signal.get('description', '')[:500]}
Respond with ONLY a JSON object:
{{"score": <number>, "reasoning": "<one sentence>"}}"""
try:
response = self.agent.run(prompt, stream=False)
return self._parse_response(response.content, signal)
except Exception as e:
return self._fallback_score(signal, str(e))
def score_batch(self, signals: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Score multiple signals, returning each with a 'relevance' key."""
scored = []
for signal in signals:
result = self.score(signal)
scored.append({**signal, "relevance": result})
return scored
def _parse_response(self, content: str, signal: Dict) -> Dict[str, Any]:
"""Parse LLM JSON response into structured output."""
try:
text = content.strip()
if "```" in text:
text = text.split("```")[1].replace("json", "").strip()
return json.loads(text)
except (json.JSONDecodeError, IndexError):
return self._fallback_score(signal, "Parse error")
def _fallback_score(self, signal: Dict, error: str) -> Dict[str, Any]:
"""
Heuristic fallback when LLM is unavailable.
Uses metadata signals (stars, points) as rough relevance proxies.
This is intentionally simple — the LLM path is the real logic.
"""
score = 50 # Default: moderate
metadata = signal.get("metadata", {})
if metadata.get("stars", 0) > 100:
score += 20
if metadata.get("points", 0) > 50:
score += 15
return {
"score": min(score, 100),
"reasoning": f"Heuristic score (LLM unavailable: {error})",
}

View File

@@ -0,0 +1,137 @@
"""
Risk Agent — Assesses security risks and breaking changes.
This agent uses LLM reasoning to analyze signals for potential risks
including security vulnerabilities, breaking API changes, and deprecation
notices. It's a legitimate agent because risk assessment requires
contextual understanding of technical implications.
Model Selection:
Uses a structured-reasoning model (gpt-4.1-mini by default) because
risk assessment benefits from careful, step-by-step analysis. For
production workloads with high-stakes decisions, consider upgrading
to gpt-4.1 or o4-mini via the MODEL_RISK environment variable.
"""
import json
from typing import Dict, Any, List
from agno.agent import Agent
from agno.models.openai import OpenAIChat
# Central model config — override via MODEL_RISK env var
import os
DEFAULT_MODEL = os.environ.get("MODEL_RISK", "gpt-4.1-mini")
class RiskAgent:
"""
Agent that assesses risk levels in technical signals.
Why this IS an agent:
Risk assessment requires reasoning about technical implications,
understanding security contexts, and making judgment calls about
severity. This is inherently a reasoning task.
Responsibilities:
- Identify security vulnerabilities
- Flag breaking changes
- Detect deprecation notices
- Rate overall risk level (LOW / MEDIUM / HIGH / CRITICAL)
"""
RISK_LEVELS = ["LOW", "MEDIUM", "HIGH", "CRITICAL"]
def __init__(self, model_id: str = None):
"""
Initialize the Risk Agent.
Args:
model_id: OpenAI model to use. Defaults to gpt-4.1-mini.
"""
self.model_id = model_id or DEFAULT_MODEL
self.agent = Agent(
name="Risk Assessor",
model=OpenAIChat(id=self.model_id),
role="Assesses security and breaking change risks in technical signals",
instructions=[
"Analyze signals for security vulnerabilities.",
"Identify breaking changes that may affect developers.",
"Flag deprecation notices and migration requirements.",
"Rate risk level: LOW, MEDIUM, HIGH, or CRITICAL.",
],
markdown=True,
)
def assess(self, signal: Dict[str, Any]) -> Dict[str, Any]:
"""
Assess risk level of a signal.
Attempts LLM assessment first, falls back to keyword heuristics.
"""
prompt = f"""Analyze this technical signal for risks:
Signal:
- Source: {signal.get('source', 'unknown')}
- Title: {signal.get('title', 'Untitled')}
- Description: {signal.get('description', '')[:500]}
Assess for:
1. Security vulnerabilities
2. Breaking changes
3. Deprecations
Respond with ONLY a JSON object:
{{"risk_level": "LOW|MEDIUM|HIGH|CRITICAL", "concerns": ["<list of concerns>"], "breaking_changes": true|false}}"""
try:
response = self.agent.run(prompt, stream=False)
return self._parse_response(response.content, signal)
except Exception as e:
return self._fallback_assessment(signal, str(e))
def assess_batch(self, signals: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Assess multiple signals, returning each with a 'risk' key."""
assessed = []
for signal in signals:
result = self.assess(signal)
assessed.append({**signal, "risk": result})
return assessed
def _parse_response(self, content: str, signal: Dict) -> Dict[str, Any]:
"""Parse LLM JSON response into structured output."""
try:
text = content.strip()
if "```" in text:
text = text.split("```")[1].replace("json", "").strip()
return json.loads(text)
except (json.JSONDecodeError, IndexError):
return self._fallback_assessment(signal, "Parse error")
def _fallback_assessment(self, signal: Dict, error: str) -> Dict[str, Any]:
"""
Keyword-based fallback when LLM is unavailable.
Simple heuristic: scan title for risk-indicating keywords.
This is intentionally conservative — better to over-flag than miss.
"""
title = signal.get("title", "").lower()
risk_level = "LOW"
concerns = []
risk_keywords = {
"HIGH": ["vulnerability", "exploit", "cve", "critical", "breach"],
"MEDIUM": ["breaking", "deprecated", "removed", "migration"],
}
for level, keywords in risk_keywords.items():
if any(kw in title for kw in keywords):
risk_level = level
concerns.append(f"Keyword match: {level}")
break
return {
"risk_level": risk_level,
"concerns": concerns or [f"Heuristic (LLM unavailable: {error})"],
"breaking_changes": "breaking" in title,
}

View File

@@ -0,0 +1,79 @@
"""
Signal Collector — Pure utility (not an agent).
Signal collection is deterministic and intentionally not agent-driven.
This module aggregates signals from adapters, normalizes them to a unified
schema, and deduplicates deterministically. No LLM reasoning is involved
because collection/normalization is a mechanical transformation — using an
agent here would be decorative, not functional.
Design Decision:
Agents are used only where reasoning is required. Signal collection
involves no ambiguity, judgment, or language understanding — it's a
pipeline transformation. Wrapping it in an Agent class would mislead
readers into thinking an LLM call is necessary here.
"""
from typing import List, Dict, Any
from datetime import datetime, timezone
class SignalCollector:
"""
Utility that collects and normalizes signals from multiple sources.
NOT an agent — no LLM calls. This is an intentional design choice.
See module docstring for rationale.
Responsibilities:
- Normalize signals to unified schema
- Deduplicate deterministically (source:id composite key)
- Filter incomplete signals
"""
def collect(self, signals: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Normalize and deduplicate raw signals from adapters.
Args:
signals: Raw signals from adapters (heterogeneous schemas).
Returns:
List of normalized, deduplicated signal dictionaries.
"""
normalized = []
seen_ids = set()
for signal in signals:
# Deterministic dedup key: source + external id
signal_id = f"{signal.get('source', 'unknown')}:{signal.get('id', '')}"
if signal_id in seen_ids:
continue
seen_ids.add(signal_id)
# Normalize to unified schema
normalized.append({
"id": signal.get("id", ""),
"source": signal.get("source", "unknown"),
"title": signal.get("title", "Untitled"),
"description": signal.get("description", ""),
"url": signal.get("url", ""),
"metadata": signal.get("metadata", {}),
"collected_at": datetime.now(timezone.utc).isoformat(),
})
return normalized
def summarize_collection(self, signals: List[Dict[str, Any]]) -> str:
"""
Generate a human-readable collection summary.
Pure string formatting — no LLM needed.
"""
sources: Dict[str, int] = {}
for s in signals:
src = s.get("source", "unknown")
sources[src] = sources.get(src, 0) + 1
parts = [f"{count} from {src}" for src, count in sources.items()]
return f"Collected {len(signals)} signals: {', '.join(parts)}"

View File

@@ -0,0 +1,167 @@
"""
Synthesis Agent — Produces final intelligence digest.
This agent combines outputs from all previous stages to create a
comprehensive, actionable intelligence summary. It's the most reasoning-
intensive agent in the pipeline and uses the strongest available model.
Model Selection:
Uses gpt-4.1 by default — the strongest reasoning model — because
synthesis requires cross-referencing multiple signals, identifying
patterns, generating executive summaries, and producing actionable
recommendations. This is the one stage where model quality directly
impacts output quality.
Override via MODEL_SYNTHESIS env var for cost optimization.
"""
from typing import Dict, Any, List
from datetime import datetime, timezone
from agno.agent import Agent
from agno.models.openai import OpenAIChat
# Central model config — override via MODEL_SYNTHESIS env var
import os
DEFAULT_MODEL = os.environ.get("MODEL_SYNTHESIS", "gpt-4.1")
class SynthesisAgent:
"""
Agent that synthesizes all signal intelligence into a final digest.
Why this uses the strongest model:
Synthesis is the most reasoning-intensive task in the pipeline.
It must cross-reference relevance scores, risk assessments, and
source metadata to produce coherent, actionable intelligence.
Using a weaker model here would produce generic, shallow outputs.
Responsibilities:
- Combine relevance and risk assessments
- Prioritize signals by importance
- Generate executive summary
- Produce actionable recommendations
"""
def __init__(self, model_id: str = None):
"""
Initialize the Synthesis Agent.
Args:
model_id: OpenAI model to use. Defaults to gpt-4.1 (strongest reasoning).
"""
self.model_id = model_id or DEFAULT_MODEL
self.agent = Agent(
name="Intelligence Synthesizer",
model=OpenAIChat(id=self.model_id),
role="Synthesizes technical signals into actionable intelligence digests",
instructions=[
"Combine relevance scores and risk assessments.",
"Prioritize by: high relevance + critical risks first.",
"Generate an executive summary.",
"Provide actionable recommendations for developers.",
],
markdown=True,
)
def synthesize(self, signals: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Synthesize signals into a final intelligence digest.
This method uses deterministic logic for prioritization and grouping,
then delegates summary generation to either LLM or heuristics.
"""
prioritized = self._prioritize_signals(signals)
grouped = self._group_by_source(prioritized)
summary = self._generate_summary(prioritized)
return {
"generated_at": datetime.now(timezone.utc).isoformat(),
"total_signals": len(signals),
"executive_summary": summary,
"priority_signals": prioritized[:5],
"signals_by_source": grouped,
"recommendations": self._generate_recommendations(prioritized),
}
def _prioritize_signals(self, signals: List[Dict]) -> List[Dict]:
"""Sort signals by composite priority score (relevance × risk multiplier)."""
def priority_score(signal):
relevance = signal.get("relevance", {}).get("score", 50)
risk_multiplier = {
"CRITICAL": 2.0,
"HIGH": 1.5,
"MEDIUM": 1.0,
"LOW": 0.8,
}.get(signal.get("risk", {}).get("risk_level", "LOW"), 1.0)
return relevance * risk_multiplier
return sorted(signals, key=priority_score, reverse=True)
def _group_by_source(self, signals: List[Dict]) -> Dict[str, List]:
"""Group signals by their source for categorized display."""
grouped: Dict[str, List] = {}
for signal in signals:
source = signal.get("source", "unknown")
grouped.setdefault(source, []).append(signal)
return grouped
def _generate_summary(self, signals: List[Dict]) -> str:
"""Generate executive summary from processed signals."""
if not signals:
return "No signals to summarize."
high_priority = [
s for s in signals if s.get("relevance", {}).get("score", 0) >= 70
]
critical_risks = [
s
for s in signals
if s.get("risk", {}).get("risk_level") in ["HIGH", "CRITICAL"]
]
parts = [f"Analyzed {len(signals)} signals."]
if high_priority:
parts.append(f"{len(high_priority)} high-relevance items detected.")
if critical_risks:
parts.append(
f"⚠️ {len(critical_risks)} signals with elevated risk."
)
if signals:
parts.append(f"Top signal: {signals[0].get('title', 'Unknown')}")
return " ".join(parts)
def _generate_recommendations(self, signals: List[Dict]) -> List[str]:
"""Generate actionable recommendations from signal analysis."""
recommendations = []
critical = [
s
for s in signals
if s.get("risk", {}).get("risk_level") == "CRITICAL"
]
if critical:
recommendations.append(
f"🚨 Review {len(critical)} critical-risk signals immediately"
)
high_rel = [
s for s in signals if s.get("relevance", {}).get("score", 0) >= 80
]
if high_rel:
recommendations.append(
f"📌 Prioritize {len(high_rel)} high-relevance items"
)
github = [s for s in signals if s.get("source") == "github"]
if github:
recommendations.append(
f"⭐ Explore {len(github)} trending repositories"
)
if not recommendations:
recommendations.append("✅ No urgent actions required")
return recommendations

View File

@@ -0,0 +1,158 @@
"""
DevPulseAI — Multi-Agent Signal Intelligence Pipeline
Demonstrates a production-style multi-agent workflow that aggregates
technical signals from multiple sources, scores them for relevance,
assesses risks, and synthesizes an actionable intelligence digest.
Architecture:
Adapters (fetch) → SignalCollector (normalize) → RelevanceAgent (score)
→ RiskAgent (assess) → SynthesisAgent (digest)
Design Decisions:
- Signal collection is a utility, not an agent (deterministic work).
- Agents are used only where reasoning is required.
- Single provider (OpenAI) by default to reduce onboarding friction.
- Models are chosen by role: fast for classification, strong for synthesis.
Usage:
export OPENAI_API_KEY=sk-...
python main.py
Without API key: agents fall back to heuristic scoring.
"""
import os
from typing import List, Dict, Any, Optional
# Reduced default signal count for faster demo execution
DEFAULT_SIGNAL_LIMIT = 5
# Import adapters
from adapters.github import fetch_github_trending
from adapters.arxiv import fetch_arxiv_papers
from adapters.hackernews import fetch_hackernews_stories
from adapters.medium import fetch_medium_blogs
from adapters.huggingface import fetch_huggingface_models
# Import pipeline components
from agents import (
SignalCollector, # Utility — no LLM
RelevanceAgent, # Agent — gpt-4.1-mini
RiskAgent, # Agent — gpt-4.1-mini
SynthesisAgent, # Agent — gpt-4.1
)
def collect_signals(limit: Optional[int] = None) -> List[Dict[str, Any]]:
"""
Collect signals from all configured sources.
This is pure data aggregation — no LLM involved.
"""
fetch_limit = limit if limit is not None else DEFAULT_SIGNAL_LIMIT
print(f"\n📡 [1/4] Collecting Signals (limit: {fetch_limit} per source)...")
signals = []
print(" → Fetching GitHub trending repos...")
signals.extend(fetch_github_trending(limit=fetch_limit))
print(" → Fetching ArXiv papers...")
signals.extend(fetch_arxiv_papers(limit=fetch_limit))
print(" → Fetching HackerNews stories...")
signals.extend(fetch_hackernews_stories(limit=fetch_limit))
print(" → Fetching Medium blogs...")
signals.extend(fetch_medium_blogs(limit=min(fetch_limit, 3)))
print(" → Fetching HuggingFace models...")
signals.extend(fetch_huggingface_models(limit=fetch_limit))
print(f" ✓ Collected {len(signals)} raw signals")
return signals
def run_pipeline():
"""
Execute the full signal intelligence pipeline.
Pipeline stages:
1. Signal Collection — Aggregate from sources (utility, no LLM)
2. Normalization — Deduplicate and normalize (utility, no LLM)
3. Relevance Score — Rate signals 0-100 (agent, gpt-4.1-mini)
4. Risk Assessment — Identify risks (agent, gpt-4.1-mini)
5. Synthesis — Produce digest (agent, gpt-4.1)
"""
print("=" * 60)
print("🧠 DevPulseAI — Signal Intelligence Pipeline")
print("=" * 60)
# Check for API key
if not os.environ.get("OPENAI_API_KEY"):
print("\n⚠️ Warning: OPENAI_API_KEY not set.")
print(" Agents will use fallback heuristics.\n")
# Stage 1: Collect raw signals from adapters
raw_signals = collect_signals()
# Stage 2: Normalize and deduplicate (utility — no LLM)
collector = SignalCollector()
print("\n🔄 [2/4] Normalizing Signals...")
normalized = collector.collect(raw_signals)
print(f"{collector.summarize_collection(normalized)}")
# Stage 3: Score for relevance (agent — gpt-4.1-mini)
relevance = RelevanceAgent()
print("\n📊 [3/4] Scoring Relevance...")
scored = relevance.score_batch(normalized)
high_relevance = sum(
1 for s in scored if s.get("relevance", {}).get("score", 0) >= 70
)
print(f"{high_relevance}/{len(scored)} signals rated high-relevance")
# Stage 4: Assess risks (agent — gpt-4.1-mini)
risk = RiskAgent()
print("\n⚠️ [4/4] Assessing Risks...")
assessed = risk.assess_batch(scored)
critical = sum(
1
for s in assessed
if s.get("risk", {}).get("risk_level") in ["HIGH", "CRITICAL"]
)
print(f"{critical}/{len(assessed)} signals with elevated risk")
# Stage 5: Synthesize digest (agent — gpt-4.1)
synthesis = SynthesisAgent()
print("\n📋 Generating Intelligence Digest...")
digest = synthesis.synthesize(assessed)
# Output results
print("\n" + "=" * 60)
print("📄 INTELLIGENCE DIGEST")
print("=" * 60)
print(f"\n🕐 Generated: {digest['generated_at']}")
print(f"📦 Total Signals: {digest['total_signals']}")
print(f"\n📝 Summary: {digest['executive_summary']}")
print("\n🎯 Top Priority Signals:")
for i, signal in enumerate(digest.get("priority_signals", [])[:3], 1):
score = signal.get("relevance", {}).get("score", "?")
risk_level = signal.get("risk", {}).get("risk_level", "?")
print(f" {i}. [{signal['source']}] {signal['title'][:50]}...")
print(f" Relevance: {score} | Risk: {risk_level}")
print("\n💡 Recommendations:")
for rec in digest.get("recommendations", []):
print(f"{rec}")
print("\n" + "=" * 60)
print("✅ Pipeline completed successfully!")
print("=" * 60)
return digest
if __name__ == "__main__":
run_pipeline()

View File

@@ -0,0 +1,8 @@
# DevPulseAI Dependencies
# Single provider (OpenAI) by default — no multi-provider setup required.
agno
openai
httpx
feedparser
streamlit>=1.30

View File

@@ -0,0 +1,217 @@
import streamlit as st
import os
from typing import List, Dict, Any
# Import pipeline components from main.py and agents
from main import collect_signals, DEFAULT_SIGNAL_LIMIT
from agents import (
SignalCollectorAgent,
RelevanceAgent,
RiskAgent,
SynthesisAgent
)
# Page Config
st.set_page_config(
page_title="DevPulseAI Signal Intelligence Demo",
page_icon="🧠",
layout="wide"
)
# Custom CSS for glassmorphism and premium feel
st.markdown("""
<style>
.main {
background: #0f172a;
color: #f1f5f9;
font-family: 'Inter', sans-serif;
}
.stApp {
background: linear-gradient(135deg, #0f172a 0%, #1e293b 100%);
}
.signal-card {
background: rgba(30, 41, 59, 0.7);
backdrop-filter: blur(10px);
border-radius: 12px;
padding: 20px;
margin-bottom: 20px;
border: 1px solid rgba(255, 255, 255, 0.1);
transition: transform 0.2s ease;
}
.signal-card:hover {
transform: translateY(-5px);
border-color: #3b82f6;
}
.badge {
padding: 4px 12px;
border-radius: 20px;
font-size: 0.8rem;
font-weight: 600;
text-transform: uppercase;
}
.risk-low { background: #059669; color: white; }
.risk-medium { background: #d97706; color: white; }
.risk-high { background: #dc2626; color: white; }
.risk-critical { background: #7f1d1d; color: white; }
.relevance-score {
font-size: 1.5rem;
font-weight: 700;
color: #3b82f6;
}
</style>
""", unsafe_allow_html=True)
# Title and Description
st.title("🧠 DevPulseAI Signal Intelligence Demo")
st.markdown("""
This demo showcases a **multi-agent system** that aggregates technical signals from various developer sources,
scores them for relevance, identifies potential risks, and synthesizes a final intelligence digest.
""")
# Sidebar Configuration
st.sidebar.header("⚙️ Pipeline Configuration")
# API Key
api_key = st.sidebar.text_input("Gemini API Key (optional)", type="password", help="Provide a Google Gemini API key. If not provided, agents will use fallback heuristic logic.")
if api_key:
# Agno's GoogleGemini looks for GOOGLE_API_KEY
os.environ["GOOGLE_API_KEY"] = api_key
# Source Selection
sources = st.sidebar.multiselect(
"Signal Sources",
["GitHub", "ArXiv", "HackerNews", "Medium", "HuggingFace"],
default=["GitHub", "ArXiv", "HackerNews", "Medium", "HuggingFace"]
)
# Signal Count Slider
signal_count = st.sidebar.slider(
"Signals per source",
min_value=4,
max_value=32,
value=DEFAULT_SIGNAL_LIMIT,
step=4
)
run_button = st.sidebar.button("🚀 Run Intelligence Pipeline", use_container_width=True)
# Main Area Logic
if run_button:
if not sources:
st.warning("Please select at least one signal source.")
else:
# Initialize Agents
collector = SignalCollectorAgent()
relevance = RelevanceAgent()
risk = RiskAgent()
synthesis = SynthesisAgent()
# Step 1: Collection
with st.status("📡 Collecting and normalizing signals...", expanded=True) as status:
st.write("Fetching raw data from sources...")
# Map selected sources to fetch calls (simplified reuse)
# We use the collect_signals logic but filter by selected sources
raw_signals = []
from adapters.github import fetch_github_trending
from adapters.arxiv import fetch_arxiv_papers
from adapters.hackernews import fetch_hackernews_stories
from adapters.medium import fetch_medium_blogs
from adapters.huggingface import fetch_huggingface_models
if "GitHub" in sources:
st.write("Fetching GitHub trending...")
raw_signals.extend(fetch_github_trending(limit=signal_count))
if "ArXiv" in sources:
st.write("Fetching ArXiv papers...")
raw_signals.extend(fetch_arxiv_papers(limit=signal_count))
if "HackerNews" in sources:
st.write("Fetching HackerNews stories...")
raw_signals.extend(fetch_hackernews_stories(limit=signal_count))
if "Medium" in sources:
st.write("Fetching Medium blogs...")
raw_signals.extend(fetch_medium_blogs(limit=min(signal_count, 3)))
if "HuggingFace" in sources:
st.write("Fetching HuggingFace models...")
raw_signals.extend(fetch_huggingface_models(limit=signal_count))
st.write(f"Normalizing {len(raw_signals)} raw signals...")
normalized = collector.collect(raw_signals)
status.update(label=f"{len(normalized)} unique signals collected", state="complete")
# Step 2: Analysis
col1, col2 = st.columns(2)
with col1:
with st.status("📊 Scoring Relevance...") as status:
scored = relevance.score_batch(normalized)
status.update(label="✅ Relevance scoring complete", state="complete")
with col2:
with st.status("⚠️ Assessing Security Risks...") as status:
assessed = risk.assess_batch(scored)
status.update(label="✅ Risk assessment complete", state="complete")
# Step 3: Synthesis
with st.status("📋 Generating Intelligence Digest...") as status:
digest = synthesis.synthesize(assessed)
status.update(label="✅ Final synthesis complete", state="complete")
# Display Results
st.divider()
st.header("📄 Intelligence Digest")
# Executive Summary
st.info(f"**Executive Summary:** {digest['executive_summary']}")
# Recommendations
st.subheader("💡 Recommendations")
for rec in digest['recommendations']:
st.write(f"{rec}")
st.divider()
st.subheader("🎯 Priority Signals")
# Display signals in expandable sections
for signal in assessed:
rel = signal.get("relevance", {})
risk_info = signal.get("risk", {})
risk_level = risk_info.get("risk_level", "UNKNOWN")
with st.expander(f"[{signal['source'].upper()}] {signal['title']}"):
col_a, col_b = st.columns([3, 1])
with col_a:
st.write(f"**Description:** {signal['description']}")
st.write(f"**URL:** [{signal['url']}]({signal['url']})")
if risk_info.get("concerns"):
st.markdown("**Security Concerns:**")
for concern in risk_info["concerns"]:
st.write(f"- {concern}")
with col_b:
st.markdown("<div style='text-align: center'>", unsafe_allow_html=True)
st.markdown(f"<div class='relevance-score'>{rel.get('score', 0)}</div>", unsafe_allow_html=True)
st.markdown("<small>RELEVANCE</small>", unsafe_allow_html=True)
risk_class = f"risk-{risk_level.lower()}"
st.markdown(f"<div class='badge {risk_class}' style='margin-top:10px'>{risk_level} RISK</div>", unsafe_allow_html=True)
st.markdown("</div>", unsafe_allow_html=True)
if rel.get("reasoning"):
st.caption(f"Reason: {rel['reasoning']}")
else:
# Landing state
st.image("https://raw.githubusercontent.com/Shubhamsaboo/awesome-llm-apps/main/advanced_ai_agents/multi_agent_apps/devpulse_ai/assets/logo.png", width=200) # Placeholder for logo logic
st.info("👈 Use the sidebar to configure the pipeline and click 'Run' to begin.")
# Educational Section
with st.expander("🛠️ How it works", expanded=True):
st.markdown("""
1. **Collector Agent**: Gathers data from GitHub, ArXiv, HN, Medium, and HuggingFace.
2. **Relevance Agent**: LLM analysis to score each signal for developer impact.
3. **Risk Agent**: Scans for breaking changes, vulnerabilities, or deprecations.
4. **Synthesis Agent**: Combines all findings into an actionable report.
""")

View File

@@ -0,0 +1,230 @@
"""
DevPulseAI Verification Script
Verifies the complete pipeline using MOCK DATA ONLY.
No network calls, no API keys, no external dependencies required.
Usage:
python verify.py
Expected output:
[OK] DevPulseAI reference pipeline executed successfully
Runs in <1s on any machine.
"""
import sys
import time
from typing import List, Dict, Any
# ────────────────────────────────────────────────
# Mock signal data — representative of real adapter output
# ────────────────────────────────────────────────
MOCK_SIGNALS = [
{
"id": "mock-gh-001",
"source": "github",
"title": "awesome-llm-apps",
"description": "A curated collection of awesome LLM apps built with RAG and AI agents.",
"url": "https://github.com/Shubhamsaboo/awesome-llm-apps",
"metadata": {"stars": 5000, "language": "Python", "topics": ["llm", "ai"]},
},
{
"id": "mock-arxiv-001",
"source": "arxiv",
"title": "Attention Is All You Need: Revisited",
"description": "A comprehensive analysis of transformer architectures.",
"url": "https://arxiv.org/abs/2401.00001",
"metadata": {"pdf": "https://arxiv.org/pdf/2401.00001", "published": "2024-01-15"},
},
{
"id": "mock-hn-001",
"source": "hackernews",
"title": "GPT-5 Breaking Changes in API",
"description": "OpenAI announces breaking changes to the Chat Completions API.",
"url": "https://news.ycombinator.com/item?id=12345",
"metadata": {"points": 500, "comments": 200, "author": "techwriter"},
},
{
"id": "mock-medium-001",
"source": "medium",
"title": "Building Production RAG Systems",
"description": "A deep dive into building scalable retrieval-augmented generation.",
"url": "https://medium.com/@techblog/building-rag",
"metadata": {"author": "TechBlog", "published": "2024-01-20"},
},
{
"id": "mock-hf-001",
"source": "huggingface",
"title": "HF Model: meta-llama/Llama-3-8B",
"description": "Pipeline: text-generation | Downloads: 1,000,000 | Likes: 5,000",
"url": "https://huggingface.co/meta-llama/Llama-3-8B",
"metadata": {"downloads": 1000000, "likes": 5000, "pipeline_tag": "text-generation"},
},
]
# ────────────────────────────────────────────────
# Verification steps
# ────────────────────────────────────────────────
def verify_imports():
"""Verify all modules can be imported without errors."""
print("[1/5] Verifying imports...")
from agents import SignalCollector, RelevanceAgent, RiskAgent, SynthesisAgent
from adapters.github import fetch_github_trending
from adapters.arxiv import fetch_arxiv_papers
from adapters.hackernews import fetch_hackernews_stories
from adapters.medium import fetch_medium_blogs
from adapters.huggingface import fetch_huggingface_models
# Verify SignalCollector is NOT an agent (no .agent attribute)
collector = SignalCollector()
assert not hasattr(collector, "agent"), (
"SignalCollector should NOT have an .agent attribute — "
"it's a utility, not an agent"
)
print(" ✓ All modules imported successfully")
print(" ✓ SignalCollector confirmed as utility (no LLM)")
return True
def verify_signal_collector():
"""Verify SignalCollector normalizes and deduplicates correctly."""
print("[2/5] Verifying Signal Collector (utility)...")
from agents import SignalCollector
collector = SignalCollector()
# Test normalization
normalized = collector.collect(MOCK_SIGNALS)
assert len(normalized) == len(MOCK_SIGNALS), "Signal count mismatch"
assert all("collected_at" in s for s in normalized), "Missing timestamp"
# Test deduplication — adding duplicates should not increase count
duped = MOCK_SIGNALS + [MOCK_SIGNALS[0]] # One duplicate
deduped = collector.collect(duped)
assert len(deduped) == len(MOCK_SIGNALS), "Deduplication failed"
summary = collector.summarize_collection(normalized)
print(f"{summary}")
print(" ✓ Deduplication works correctly")
return normalized
def verify_relevance_agent(signals: List[Dict]):
"""Verify RelevanceAgent fallback scoring works without API key."""
print("[3/5] Verifying Relevance Agent (fallback mode)...")
from agents import RelevanceAgent
agent = RelevanceAgent()
# Use fallback scoring directly (no API key needed)
scored = []
for signal in signals:
result = agent._fallback_score(signal, "Mock mode")
scored.append({**signal, "relevance": result})
assert all("relevance" in s for s in scored), "Missing relevance scores"
assert all(
0 <= s["relevance"]["score"] <= 100 for s in scored
), "Scores out of range"
print(f" ✓ Scored {len(scored)} signals (heuristic fallback)")
return scored
def verify_risk_agent(signals: List[Dict]):
"""Verify RiskAgent fallback assessment works without API key."""
print("[4/5] Verifying Risk Agent (fallback mode)...")
from agents import RiskAgent
agent = RiskAgent()
assessed = []
for signal in signals:
result = agent._fallback_assessment(signal, "Mock mode")
assessed.append({**signal, "risk": result})
assert all("risk" in s for s in assessed), "Missing risk assessments"
# Verify keyword detection: "GPT-5 Breaking Changes" should flag
breaking = [s for s in assessed if s.get("risk", {}).get("breaking_changes")]
assert len(breaking) >= 1, "Breaking change detection failed"
print(f" ✓ Assessed {len(assessed)} signals ({len(breaking)} with breaking changes)")
return assessed
def verify_synthesis_agent(signals: List[Dict]):
"""Verify SynthesisAgent produces valid digest structure."""
print("[5/5] Verifying Synthesis Agent...")
from agents import SynthesisAgent
agent = SynthesisAgent()
digest = agent.synthesize(signals)
assert "generated_at" in digest, "Missing timestamp"
assert "executive_summary" in digest, "Missing summary"
assert "recommendations" in digest, "Missing recommendations"
assert "priority_signals" in digest, "Missing priority signals"
assert digest["total_signals"] == len(signals), "Signal count mismatch"
print(f" ✓ Generated digest with {len(digest['recommendations'])} recommendations")
return digest
# ────────────────────────────────────────────────
# Main verification runner
# ────────────────────────────────────────────────
def run_verification():
"""Run the complete verification suite with timing."""
print("=" * 60)
print("🔍 DevPulseAI Verification Suite")
print("=" * 60)
print("\nUsing MOCK DATA — No network calls or API keys required.\n")
start = time.time()
try:
if not verify_imports():
raise AssertionError("Import verification failed")
normalized = verify_signal_collector()
scored = verify_relevance_agent(normalized)
assessed = verify_risk_agent(scored)
digest = verify_synthesis_agent(assessed)
elapsed = time.time() - start
# Final summary
print("\n" + "=" * 60)
print("📊 Verification Summary")
print("=" * 60)
print(f" • Signals processed: {digest['total_signals']}")
print(f" • Summary: {digest['executive_summary']}")
print(f" • Recommendations: {len(digest['recommendations'])}")
print(f" • Time elapsed: {elapsed:.3f}s")
print("\n" + "=" * 60)
print("[OK] DevPulseAI reference pipeline executed successfully")
print("=" * 60)
return True
except Exception as e:
print(f"\n[FAIL] Verification failed: {e}")
import traceback
traceback.print_exc()
return False
if __name__ == "__main__":
success = run_verification()
sys.exit(0 if success else 1)

View File

@@ -0,0 +1,254 @@
{
"name": "Signal Intelligence Ingestion Pipeline",
"description": "Optional n8n workflow for automating the DevPulseAI signal intelligence pipeline. Import into n8n to schedule daily digest generation.",
"nodes": [
{
"parameters": {},
"id": "trigger-cron",
"name": "Daily Trigger",
"type": "n8n-nodes-base.scheduleTrigger",
"typeVersion": 1.1,
"position": [
0,
300
],
"notes": "Triggers the pipeline daily at configured time"
},
{
"parameters": {
"httpMethod": "POST",
"path": "trigger-pulse",
"responseMode": "responseNode"
},
"id": "webhook-trigger",
"name": "Webhook Trigger",
"type": "n8n-nodes-base.webhook",
"typeVersion": 2,
"position": [
0,
500
],
"webhookId": "devpulse-trigger",
"notes": "Manual trigger via POST request"
},
{
"parameters": {
"url": "https://api.github.com/search/repositories?q=stars:>1000&sort=stars&order=desc&per_page=10",
"options": {
"response": {
"response": {
"responseFormat": "json"
}
}
}
},
"id": "github-adapter",
"name": "GitHub Trending Repos",
"type": "n8n-nodes-base.httpRequest",
"typeVersion": 4.2,
"position": [
400,
100
],
"notes": "Fetches trending GitHub repositories"
},
{
"parameters": {
"url": "http://export.arxiv.org/api/query?search_query=cat:cs.AI+OR+cat:cs.LG&sortBy=submittedDate&sortOrder=descending&max_results=10"
},
"id": "arxiv-adapter",
"name": "ArXiv Papers",
"type": "n8n-nodes-base.httpRequest",
"typeVersion": 4.2,
"position": [
400,
400
],
"notes": "Fetches latest AI/ML research papers"
},
{
"parameters": {
"url": "https://hn.algolia.com/api/v1/search_by_date?query=AI&tags=story&hitsPerPage=10"
},
"id": "hackernews-adapter",
"name": "HackerNews Top Stories",
"type": "n8n-nodes-base.httpRequest",
"typeVersion": 4.2,
"position": [
400,
550
],
"notes": "Fetches top HackerNews stories"
},
{
"parameters": {
"mode": "multiplex"
},
"id": "merge-signals",
"name": "Aggregate Signals",
"type": "n8n-nodes-base.merge",
"typeVersion": 3,
"position": [
650,
400
],
"notes": "Combines all signal sources into unified stream"
},
{
"parameters": {
"resource": "chat",
"model": "gpt-4o-mini",
"prompt": {
"messages": [
{
"role": "system",
"content": "You are a Relevance Scoring Agent. Score the following content from 0-100 based on its relevance to AI/ML developers. Return ONLY a JSON object: {\"score\": <number>, \"reason\": \"<1 sentence>\"}"
},
{
"role": "user",
"content": "Title: {{ $json.title }}\nDescription: {{ $json.description }}"
}
]
},
"options": {
"temperature": 0.1,
"maxOutputTokens": 100
}
},
"id": "relevance-agent",
"name": "Relevance Agent",
"type": "@n8n/n8n-nodes-langchain.lmChatOpenAi",
"typeVersion": 1,
"position": [
1100,
400
],
"notes": "Scores content 0-100 based on developer relevance"
},
{
"parameters": {
"resource": "chat",
"model": "gpt-4o-mini",
"prompt": {
"messages": [
{
"role": "system",
"content": "You are a Risk Assessment Agent. Analyze for: breaking changes, security vulnerabilities, or deprecations. Return ONLY a JSON object: {\"risk_level\": \"HIGH|MEDIUM|LOW\", \"concerns\": [\"<list>\"]}"
},
{
"role": "user",
"content": "Title: {{ $json.title }}\nDescription: {{ $json.description }}"
}
]
},
"options": {
"temperature": 0.1,
"maxOutputTokens": 150
}
},
"id": "risk-agent",
"name": "Risk Agent",
"type": "@n8n/n8n-nodes-langchain.lmChatOpenAi",
"typeVersion": 1,
"position": [
1100,
600
],
"notes": "Flags breaking changes and security vulnerabilities"
}
],
"connections": {
"Daily Trigger": {
"main": [
[
{
"node": "GitHub Trending Repos",
"type": "main",
"index": 0
}
]
]
},
"Webhook Trigger": {
"main": [
[
{
"node": "GitHub Trending Repos",
"type": "main",
"index": 0
}
]
]
},
"GitHub Trending Repos": {
"main": [
[
{
"node": "Aggregate Signals",
"type": "main",
"index": 0
}
]
]
},
"ArXiv Papers": {
"main": [
[
{
"node": "Aggregate Signals",
"type": "main",
"index": 1
}
]
]
},
"HackerNews Top Stories": {
"main": [
[
{
"node": "Aggregate Signals",
"type": "main",
"index": 2
}
]
]
},
"Aggregate Signals": {
"main": [
[
{
"node": "Relevance Agent",
"type": "main",
"index": 0
},
{
"node": "Risk Agent",
"type": "main",
"index": 0
}
]
]
}
},
"settings": {
"executionOrder": "v1"
},
"tags": [
{
"name": "signal-intelligence"
},
{
"name": "automation"
},
{
"name": "developer-tools"
},
{
"name": "ai-agents"
}
],
"meta": {
"templateCredsSetupCompleted": false,
"instanceId": "devpulse-reference"
}
}