diff --git a/advanced_ai_agents/multi_agent_apps/multi_agent_trust_layer/README.md b/advanced_ai_agents/multi_agent_apps/multi_agent_trust_layer/README.md new file mode 100644 index 0000000..06d609f --- /dev/null +++ b/advanced_ai_agents/multi_agent_apps/multi_agent_trust_layer/README.md @@ -0,0 +1,230 @@ +# 🀝 Multi-Agent Trust Layer - Secure Agent-to-Agent Communication + +Learn how to build a trust layer for multi-agent systems that enables secure delegation, trust scoring, and policy enforcement between AI agents. + +## Features + +- **Agent Identity**: Each agent has a verifiable identity with a human sponsor +- **Trust Scoring**: Behavioral monitoring with a 0-1000 trust score +- **Delegation Chains**: Cryptographically narrow scope when delegating tasks +- **Policy Enforcement**: Enforce compliance rules across agent interactions +- **Audit Trail**: Full observability of agent-to-agent communications + +## How It Works + +``` +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ Agent A │◀───────▢│ Trust Layer β”‚ +β”‚ (Orchestrator) β”‚ TLS β”‚ β”‚ +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β€’ Identity β”‚ + β”‚ β€’ Trust Score β”‚ +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ β€’ Delegation β”‚ +β”‚ Agent B │◀───────▢│ β€’ Policy β”‚ +β”‚ (Specialist) β”‚ TLS β”‚ β€’ Audit β”‚ +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +``` + +1. **Registration**: Agents register with verified identity and human sponsor +2. **Trust Establishment**: Initial trust score based on sponsor reputation +3. **Delegation**: Parent agents can delegate tasks with narrowed permissions +4. **Monitoring**: All actions are tracked and trust scores updated +5. **Enforcement**: Policies determine what each agent can do + +## Requirements + +- Python 3.8+ +- OpenAI API key (or any LLM provider) +- Required Python packages (see `requirements.txt`) + +## Installation + +1. Clone this repository: + ```bash + git clone https://github.com/Shubhamsaboo/awesome-llm-apps.git + cd advanced_ai_agents/multi_agent_apps/multi_agent_trust_layer + ``` + +2. Install the required packages: + ```bash + pip install -r requirements.txt + ``` + +## Usage + +1. Set your API key: + ```bash + export OPENAI_API_KEY=your-openai-api-key + ``` + +2. Run the trust layer demo: + ```bash + python multi_agent_trust_layer.py + ``` + +3. Watch agents interact through the trust layer with full observability. + +## Example: Agent Delegation Chain + +```python +# Orchestrator agent creates a delegation for a specialist +delegation = trust_layer.create_delegation( + from_agent="orchestrator-001", + to_agent="researcher-002", + scope={ + "allowed_actions": ["web_search", "summarize"], + "max_tokens": 10000, + "time_limit_minutes": 30, + "allowed_domains": ["arxiv.org", "github.com"] + }, + task_description="Research recent papers on AI safety" +) + +# Researcher can only perform actions within the delegated scope +result = researcher.execute_with_delegation( + delegation=delegation, + action="web_search", + params={"query": "AI safety papers 2024"} +) +``` + +## Trust Score System + +Trust scores range from 0-1000: + +| Score Range | Level | Permissions | +|-------------|-------|-------------| +| 900-1000 | Trusted | Full access within role | +| 700-899 | Standard | Normal operations | +| 500-699 | Probation | Limited actions, extra logging | +| 300-499 | Restricted | Human approval required | +| 0-299 | Suspended | No autonomous actions | + +### Score Updates + +```python +# Positive behaviors increase trust ++10: Successfully completed delegated task ++5: Stayed within scope boundaries ++2: Provided accurate information + +# Negative behaviors decrease trust +-50: Attempted action outside scope +-30: Provided inaccurate information +-20: Exceeded resource limits +-100: Security violation +``` + +## Example Output + +``` +🀝 Multi-Agent Trust Layer Demo +================================ + +πŸ“‹ Registering agents... +βœ… Registered: orchestrator-001 (Human Sponsor: alice@company.com) +βœ… Registered: researcher-002 (Human Sponsor: bob@company.com) +βœ… Registered: writer-003 (Human Sponsor: carol@company.com) + +πŸ” Creating delegation chain... +βœ… Delegation: orchestrator-001 β†’ researcher-002 + Scope: web_search, summarize + Time Limit: 30 minutes + +πŸ€– Agent researcher-002 executing: web_search + Query: "AI safety papers 2024" +βœ… Action ALLOWED (within delegated scope) + Trust Score: 850 β†’ 860 (+10) + +πŸ€– Agent researcher-002 executing: send_email +❌ Action DENIED (not in delegated scope) + Trust Score: 860 β†’ 810 (-50) + +πŸ“Š Trust Scores: + orchestrator-001: 900 (Trusted) + researcher-002: 810 (Standard) + writer-003: 850 (Standard) +``` + +## Key Concepts + +### 1. Agent Identity + +Every agent has a cryptographic identity tied to a human sponsor: + +```python +@dataclass +class AgentIdentity: + agent_id: str + public_key: str + human_sponsor: str # Accountable human + organization: str + roles: List[str] + created_at: datetime +``` + +### 2. Delegation Chains + +Delegations form a chain where each link can only narrow scope: + +```python +@dataclass +class Delegation: + delegation_id: str + parent_agent: str + child_agent: str + scope: DelegationScope + signature: str # Signed by parent + parent_delegation: Optional[str] # Links to parent's delegation +``` + +### 3. Policy Enforcement + +Policies define what agents can do based on trust and role: + +```python +policies: + researcher: + base_trust_required: 500 + allowed_actions: + - web_search + - read_document + - summarize + denied_actions: + - execute_code + - send_email + resource_limits: + max_tokens_per_hour: 100000 + max_api_calls_per_minute: 60 +``` + +## Architecture + +``` +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ Trust Layer β”‚ +β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€ +β”‚ Identity β”‚ Trust β”‚ Delegation β”‚ Policy β”‚ +β”‚ Registry β”‚ Scoring β”‚ Manager β”‚ Engine β”‚ +β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€ +β”‚ Audit Logger β”‚ +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + β–² β–² β–² + β”‚ β”‚ β”‚ + β”Œβ”€β”€β”€β”€β”΄β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”΄β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”΄β”€β”€β”€β”€β” + β”‚ Agent A β”‚ β”‚ Agent B β”‚ β”‚ Agent C β”‚ + β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +``` + +## Extending the Tutorial + +- Add cryptographic signatures for delegation verification +- Implement reputation systems across organizations +- Add real-time trust score visualization +- Connect to external identity providers (OAuth, SAML) +- Implement secure communication channels (mTLS) + +## Related Projects + +- [LangGraph](https://github.com/langchain-ai/langgraph) - Multi-agent orchestration +- [CrewAI](https://github.com/joaomdmoura/crewAI) - Multi-agent framework +- [AutoGen](https://github.com/microsoft/autogen) - Multi-agent conversations diff --git a/advanced_ai_agents/multi_agent_apps/multi_agent_trust_layer/multi_agent_trust_layer.py b/advanced_ai_agents/multi_agent_apps/multi_agent_trust_layer/multi_agent_trust_layer.py new file mode 100644 index 0000000..cfa2862 --- /dev/null +++ b/advanced_ai_agents/multi_agent_apps/multi_agent_trust_layer/multi_agent_trust_layer.py @@ -0,0 +1,782 @@ +""" +🀝 Multi-Agent Trust Layer Tutorial + +This tutorial demonstrates how to build a trust layer for multi-agent systems +that enables secure delegation, trust scoring, and policy enforcement. + +Key concepts: +- Agent identity and registration +- Trust scoring based on behavior +- Delegation chains with scope narrowing +- Policy enforcement across agent interactions +- Full audit trail of agent communications +""" + +import os +import json +import hashlib +import secrets +import logging +from datetime import datetime, timedelta +from typing import Dict, Any, List, Optional, Set +from dataclasses import dataclass, field +from enum import Enum +from collections import defaultdict + +from openai import OpenAI + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + + +# ============================================================================ +# TRUST LEVELS +# ============================================================================ + +class TrustLevel(Enum): + """Trust levels based on score ranges""" + SUSPENDED = "suspended" # 0-299 + RESTRICTED = "restricted" # 300-499 + PROBATION = "probation" # 500-699 + STANDARD = "standard" # 700-899 + TRUSTED = "trusted" # 900-1000 + + @classmethod + def from_score(cls, score: int) -> "TrustLevel": + if score >= 900: + return cls.TRUSTED + elif score >= 700: + return cls.STANDARD + elif score >= 500: + return cls.PROBATION + elif score >= 300: + return cls.RESTRICTED + else: + return cls.SUSPENDED + + +# ============================================================================ +# CORE DATA STRUCTURES +# ============================================================================ + +@dataclass +class AgentIdentity: + """Represents an agent's verified identity""" + agent_id: str + public_key: str + human_sponsor: str # Email of accountable human + organization: str + roles: List[str] + created_at: datetime = field(default_factory=datetime.utcnow) + metadata: Dict[str, Any] = field(default_factory=dict) + + +@dataclass +class TrustScore: + """Agent's trust score with history""" + agent_id: str + score: int # 0-1000 + level: TrustLevel + history: List[Dict[str, Any]] = field(default_factory=list) + last_updated: datetime = field(default_factory=datetime.utcnow) + + def update(self, delta: int, reason: str): + """Update trust score with bounds checking""" + old_score = self.score + self.score = max(0, min(1000, self.score + delta)) + self.level = TrustLevel.from_score(self.score) + self.last_updated = datetime.utcnow() + self.history.append({ + "timestamp": self.last_updated.isoformat(), + "old_score": old_score, + "new_score": self.score, + "delta": delta, + "reason": reason + }) + + +@dataclass +class DelegationScope: + """Defines what an agent can do under a delegation""" + allowed_actions: Set[str] + denied_actions: Set[str] = field(default_factory=set) + allowed_domains: Set[str] = field(default_factory=set) + max_tokens: int = 10000 + time_limit_minutes: int = 60 + max_sub_delegations: int = 0 # Can this agent delegate further? + custom_constraints: Dict[str, Any] = field(default_factory=dict) + + def allows_action(self, action: str) -> bool: + """Check if an action is allowed under this scope""" + if action in self.denied_actions: + return False + if not self.allowed_actions: # Empty means all allowed + return True + return action in self.allowed_actions + + def narrow(self, child_scope: "DelegationScope") -> "DelegationScope": + """Create a narrowed scope for sub-delegation""" + return DelegationScope( + allowed_actions=self.allowed_actions & child_scope.allowed_actions, + denied_actions=self.denied_actions | child_scope.denied_actions, + allowed_domains=self.allowed_domains & child_scope.allowed_domains if self.allowed_domains else child_scope.allowed_domains, + max_tokens=min(self.max_tokens, child_scope.max_tokens), + time_limit_minutes=min(self.time_limit_minutes, child_scope.time_limit_minutes), + max_sub_delegations=min(self.max_sub_delegations, child_scope.max_sub_delegations), + custom_constraints={**self.custom_constraints, **child_scope.custom_constraints} + ) + + +@dataclass +class Delegation: + """A delegation of authority from one agent to another""" + delegation_id: str + parent_agent: str + child_agent: str + scope: DelegationScope + task_description: str + created_at: datetime + expires_at: datetime + signature: str # Signed by parent agent + parent_delegation_id: Optional[str] = None # For chain tracking + tokens_used: int = 0 + is_revoked: bool = False + + def is_valid(self) -> bool: + """Check if delegation is still valid""" + if self.is_revoked: + return False + if datetime.utcnow() > self.expires_at: + return False + if self.tokens_used >= self.scope.max_tokens: + return False + return True + + +@dataclass +class AuditEntry: + """Audit log entry for agent interactions""" + timestamp: datetime + event_type: str + agent_id: str + action: str + delegation_id: Optional[str] + result: str # "allowed", "denied", "error" + details: Dict[str, Any] + trust_impact: int = 0 + + +# ============================================================================ +# IDENTITY REGISTRY +# ============================================================================ + +class IdentityRegistry: + """Manages agent identities""" + + def __init__(self): + self.identities: Dict[str, AgentIdentity] = {} + self.sponsor_to_agents: Dict[str, List[str]] = defaultdict(list) + + def register(self, identity: AgentIdentity) -> bool: + """Register a new agent identity""" + if identity.agent_id in self.identities: + logger.warning(f"Agent {identity.agent_id} already registered") + return False + + self.identities[identity.agent_id] = identity + self.sponsor_to_agents[identity.human_sponsor].append(identity.agent_id) + logger.info(f"Registered agent: {identity.agent_id} (sponsor: {identity.human_sponsor})") + return True + + def get(self, agent_id: str) -> Optional[AgentIdentity]: + """Get agent identity""" + return self.identities.get(agent_id) + + def revoke(self, agent_id: str, reason: str) -> bool: + """Revoke an agent's identity""" + if agent_id in self.identities: + identity = self.identities.pop(agent_id) + self.sponsor_to_agents[identity.human_sponsor].remove(agent_id) + logger.warning(f"Revoked agent: {agent_id} - {reason}") + return True + return False + + +# ============================================================================ +# TRUST SCORING ENGINE +# ============================================================================ + +class TrustScoringEngine: + """Manages trust scores for agents""" + + # Score adjustments for various events + SCORE_ADJUSTMENTS = { + "task_completed": +10, + "stayed_in_scope": +5, + "accurate_output": +2, + "scope_violation_attempt": -50, + "inaccurate_output": -30, + "resource_exceeded": -20, + "security_violation": -100, + "delegation_success": +15, + "delegation_failure": -25, + } + + def __init__(self, initial_score: int = 700): + self.scores: Dict[str, TrustScore] = {} + self.initial_score = initial_score + + def initialize(self, agent_id: str, initial_score: Optional[int] = None) -> TrustScore: + """Initialize trust score for a new agent""" + score = initial_score or self.initial_score + trust_score = TrustScore( + agent_id=agent_id, + score=score, + level=TrustLevel.from_score(score) + ) + self.scores[agent_id] = trust_score + return trust_score + + def get(self, agent_id: str) -> Optional[TrustScore]: + """Get agent's trust score""" + return self.scores.get(agent_id) + + def record_event(self, agent_id: str, event_type: str, custom_delta: Optional[int] = None) -> int: + """Record an event and update trust score""" + if agent_id not in self.scores: + self.initialize(agent_id) + + delta = custom_delta if custom_delta is not None else self.SCORE_ADJUSTMENTS.get(event_type, 0) + self.scores[agent_id].update(delta, event_type) + + logger.info(f"Trust update: {agent_id} {delta:+d} ({event_type}) β†’ {self.scores[agent_id].score}") + return delta + + +# ============================================================================ +# DELEGATION MANAGER +# ============================================================================ + +class DelegationManager: + """Manages delegation chains between agents""" + + def __init__(self, identity_registry: IdentityRegistry, trust_engine: TrustScoringEngine): + self.delegations: Dict[str, Delegation] = {} + self.agent_delegations: Dict[str, List[str]] = defaultdict(list) # agent_id -> [delegation_ids] + self.identity_registry = identity_registry + self.trust_engine = trust_engine + + def create_delegation( + self, + parent_agent: str, + child_agent: str, + scope: DelegationScope, + task_description: str, + time_limit_minutes: Optional[int] = None, + parent_delegation_id: Optional[str] = None + ) -> Optional[Delegation]: + """Create a new delegation from parent to child agent""" + + # Verify both agents exist + if not self.identity_registry.get(parent_agent): + logger.error(f"Parent agent not found: {parent_agent}") + return None + if not self.identity_registry.get(child_agent): + logger.error(f"Child agent not found: {child_agent}") + return None + + # Check parent's trust level + parent_trust = self.trust_engine.get(parent_agent) + if parent_trust and parent_trust.level == TrustLevel.SUSPENDED: + logger.error(f"Suspended agent cannot delegate: {parent_agent}") + return None + + # If this is a sub-delegation, narrow the scope + if parent_delegation_id: + parent_del = self.delegations.get(parent_delegation_id) + if not parent_del or not parent_del.is_valid(): + logger.error(f"Invalid parent delegation: {parent_delegation_id}") + return None + if parent_del.scope.max_sub_delegations <= 0: + logger.error(f"No sub-delegations allowed under: {parent_delegation_id}") + return None + scope = parent_del.scope.narrow(scope) + + # Create delegation + delegation_id = f"del-{secrets.token_hex(8)}" + time_limit = time_limit_minutes or scope.time_limit_minutes + + delegation = Delegation( + delegation_id=delegation_id, + parent_agent=parent_agent, + child_agent=child_agent, + scope=scope, + task_description=task_description, + created_at=datetime.utcnow(), + expires_at=datetime.utcnow() + timedelta(minutes=time_limit), + signature=self._sign_delegation(parent_agent, delegation_id), + parent_delegation_id=parent_delegation_id + ) + + self.delegations[delegation_id] = delegation + self.agent_delegations[child_agent].append(delegation_id) + + logger.info(f"Created delegation: {parent_agent} β†’ {child_agent} ({delegation_id})") + return delegation + + def _sign_delegation(self, agent_id: str, delegation_id: str) -> str: + """Create a signature for a delegation (simplified - real impl would use crypto)""" + data = f"{agent_id}:{delegation_id}:{datetime.utcnow().isoformat()}" + return hashlib.sha256(data.encode()).hexdigest()[:16] + + def validate_action(self, agent_id: str, action: str, delegation_id: Optional[str] = None) -> bool: + """Validate if an agent can perform an action under their delegation""" + + if delegation_id: + delegation = self.delegations.get(delegation_id) + if not delegation: + return False + if not delegation.is_valid(): + return False + if delegation.child_agent != agent_id: + return False + return delegation.scope.allows_action(action) + + # Check if agent has any valid delegation allowing this action + for del_id in self.agent_delegations.get(agent_id, []): + delegation = self.delegations.get(del_id) + if delegation and delegation.is_valid() and delegation.scope.allows_action(action): + return True + + return False + + def revoke_delegation(self, delegation_id: str, reason: str) -> bool: + """Revoke a delegation""" + if delegation_id in self.delegations: + self.delegations[delegation_id].is_revoked = True + logger.warning(f"Revoked delegation: {delegation_id} - {reason}") + return True + return False + + def get_active_delegations(self, agent_id: str) -> List[Delegation]: + """Get all active delegations for an agent""" + return [ + self.delegations[del_id] + for del_id in self.agent_delegations.get(agent_id, []) + if self.delegations[del_id].is_valid() + ] + + +# ============================================================================ +# POLICY ENGINE +# ============================================================================ + +class MultiAgentPolicyEngine: + """Enforces policies for multi-agent interactions""" + + def __init__(self, trust_engine: TrustScoringEngine, delegation_manager: DelegationManager): + self.trust_engine = trust_engine + self.delegation_manager = delegation_manager + self.role_policies: Dict[str, Dict[str, Any]] = {} + + def add_role_policy(self, role: str, policy: Dict[str, Any]): + """Add a policy for a specific role""" + self.role_policies[role] = policy + + def evaluate( + self, + agent_id: str, + action: str, + roles: List[str], + delegation_id: Optional[str] = None + ) -> tuple[bool, str]: + """Evaluate if an action is allowed""" + + # Check trust level + trust = self.trust_engine.get(agent_id) + if not trust: + return False, "Agent has no trust score" + + if trust.level == TrustLevel.SUSPENDED: + return False, "Agent is suspended" + + # Check role-based policies + for role in roles: + policy = self.role_policies.get(role, {}) + + # Check base trust requirement + min_trust = policy.get("base_trust_required", 0) + if trust.score < min_trust: + return False, f"Trust score {trust.score} below minimum {min_trust} for role {role}" + + # Check denied actions + if action in policy.get("denied_actions", []): + return False, f"Action '{action}' denied for role {role}" + + # Check allowed actions (if specified, action must be in list) + allowed = policy.get("allowed_actions", []) + if allowed and action not in allowed: + return False, f"Action '{action}' not in allowed list for role {role}" + + # Check delegation + if delegation_id: + if not self.delegation_manager.validate_action(agent_id, action, delegation_id): + return False, f"Action '{action}' not allowed under delegation {delegation_id}" + + # Require approval for restricted agents + if trust.level == TrustLevel.RESTRICTED: + return False, "Agent is restricted - requires human approval" + + return True, "Action allowed" + + +# ============================================================================ +# TRUST LAYER (MAIN INTERFACE) +# ============================================================================ + +class TrustLayer: + """Main interface for the multi-agent trust layer""" + + def __init__(self): + self.identity_registry = IdentityRegistry() + self.trust_engine = TrustScoringEngine() + self.delegation_manager = DelegationManager(self.identity_registry, self.trust_engine) + self.policy_engine = MultiAgentPolicyEngine(self.trust_engine, self.delegation_manager) + self.audit_log: List[AuditEntry] = [] + + def register_agent( + self, + agent_id: str, + human_sponsor: str, + organization: str, + roles: List[str], + initial_trust: int = 700 + ) -> bool: + """Register a new agent with the trust layer""" + + # Generate a key pair (simplified) + public_key = hashlib.sha256(f"{agent_id}:{secrets.token_hex(16)}".encode()).hexdigest() + + identity = AgentIdentity( + agent_id=agent_id, + public_key=public_key, + human_sponsor=human_sponsor, + organization=organization, + roles=roles + ) + + if self.identity_registry.register(identity): + self.trust_engine.initialize(agent_id, initial_trust) + return True + return False + + def create_delegation( + self, + from_agent: str, + to_agent: str, + scope: Dict[str, Any], + task_description: str, + time_limit_minutes: int = 60 + ) -> Optional[str]: + """Create a delegation from one agent to another""" + + delegation_scope = DelegationScope( + allowed_actions=set(scope.get("allowed_actions", [])), + denied_actions=set(scope.get("denied_actions", [])), + allowed_domains=set(scope.get("allowed_domains", [])), + max_tokens=scope.get("max_tokens", 10000), + time_limit_minutes=time_limit_minutes, + max_sub_delegations=scope.get("max_sub_delegations", 0) + ) + + delegation = self.delegation_manager.create_delegation( + parent_agent=from_agent, + child_agent=to_agent, + scope=delegation_scope, + task_description=task_description, + time_limit_minutes=time_limit_minutes + ) + + return delegation.delegation_id if delegation else None + + def authorize_action( + self, + agent_id: str, + action: str, + delegation_id: Optional[str] = None + ) -> tuple[bool, str]: + """Authorize an action for an agent""" + + identity = self.identity_registry.get(agent_id) + if not identity: + self._log_audit(agent_id, action, "denied", delegation_id, {"reason": "Unknown agent"}) + return False, "Unknown agent" + + allowed, reason = self.policy_engine.evaluate( + agent_id=agent_id, + action=action, + roles=identity.roles, + delegation_id=delegation_id + ) + + # Update trust score based on result + if allowed: + self.trust_engine.record_event(agent_id, "stayed_in_scope") + else: + self.trust_engine.record_event(agent_id, "scope_violation_attempt") + + self._log_audit( + agent_id, action, + "allowed" if allowed else "denied", + delegation_id, + {"reason": reason} + ) + + return allowed, reason + + def record_task_result(self, agent_id: str, delegation_id: str, success: bool): + """Record the result of a delegated task""" + if success: + self.trust_engine.record_event(agent_id, "task_completed") + self.trust_engine.record_event( + self.delegation_manager.delegations[delegation_id].parent_agent, + "delegation_success" + ) + else: + self.trust_engine.record_event(agent_id, "delegation_failure") + + def get_trust_score(self, agent_id: str) -> Optional[int]: + """Get an agent's trust score""" + trust = self.trust_engine.get(agent_id) + return trust.score if trust else None + + def get_trust_level(self, agent_id: str) -> Optional[TrustLevel]: + """Get an agent's trust level""" + trust = self.trust_engine.get(agent_id) + return trust.level if trust else None + + def _log_audit( + self, + agent_id: str, + action: str, + result: str, + delegation_id: Optional[str], + details: Dict[str, Any] + ): + """Log an audit entry""" + entry = AuditEntry( + timestamp=datetime.utcnow(), + event_type="action_authorization", + agent_id=agent_id, + action=action, + delegation_id=delegation_id, + result=result, + details=details + ) + self.audit_log.append(entry) + + def get_audit_log(self, agent_id: Optional[str] = None) -> List[Dict]: + """Get audit log, optionally filtered by agent""" + entries = self.audit_log + if agent_id: + entries = [e for e in entries if e.agent_id == agent_id] + + return [ + { + "timestamp": e.timestamp.isoformat(), + "event_type": e.event_type, + "agent_id": e.agent_id, + "action": e.action, + "delegation_id": e.delegation_id, + "result": e.result, + "details": e.details + } + for e in entries + ] + + +# ============================================================================ +# EXAMPLE: GOVERNED MULTI-AGENT SYSTEM +# ============================================================================ + +class GovernedAgent: + """An agent that operates within the trust layer""" + + def __init__(self, agent_id: str, trust_layer: TrustLayer): + self.agent_id = agent_id + self.trust_layer = trust_layer + self.current_delegation: Optional[str] = None + + def execute(self, action: str, params: Dict[str, Any]) -> Dict[str, Any]: + """Execute an action through the trust layer""" + + allowed, reason = self.trust_layer.authorize_action( + self.agent_id, + action, + self.current_delegation + ) + + if not allowed: + return { + "success": False, + "error": f"Action denied: {reason}", + "trust_score": self.trust_layer.get_trust_score(self.agent_id) + } + + # Simulate action execution + result = self._execute_action(action, params) + + return { + "success": True, + "result": result, + "trust_score": self.trust_layer.get_trust_score(self.agent_id) + } + + def _execute_action(self, action: str, params: Dict[str, Any]) -> str: + """Simulate executing an action""" + return f"Executed {action} with params {params}" + + +# ============================================================================ +# MAIN DEMO +# ============================================================================ + +def main(): + print("🀝 Multi-Agent Trust Layer Demo") + print("=" * 40) + + # Initialize trust layer + trust_layer = TrustLayer() + + # Add role policies + trust_layer.policy_engine.add_role_policy("researcher", { + "base_trust_required": 500, + "allowed_actions": ["web_search", "read_document", "summarize", "analyze"], + "denied_actions": ["execute_code", "send_email", "delete_file"] + }) + + trust_layer.policy_engine.add_role_policy("writer", { + "base_trust_required": 600, + "allowed_actions": ["write_document", "edit_document", "summarize"], + "denied_actions": ["execute_code", "web_search"] + }) + + trust_layer.policy_engine.add_role_policy("orchestrator", { + "base_trust_required": 800, + "allowed_actions": [], # Can do anything not explicitly denied + "denied_actions": ["delete_system_files"] + }) + + # Register agents + print("\nπŸ“‹ Registering agents...") + + trust_layer.register_agent( + agent_id="orchestrator-001", + human_sponsor="alice@company.com", + organization="Acme Corp", + roles=["orchestrator"], + initial_trust=900 + ) + print("βœ… Registered: orchestrator-001 (Sponsor: alice@company.com)") + + trust_layer.register_agent( + agent_id="researcher-002", + human_sponsor="bob@company.com", + organization="Acme Corp", + roles=["researcher"], + initial_trust=750 + ) + print("βœ… Registered: researcher-002 (Sponsor: bob@company.com)") + + trust_layer.register_agent( + agent_id="writer-003", + human_sponsor="carol@company.com", + organization="Acme Corp", + roles=["writer"], + initial_trust=700 + ) + print("βœ… Registered: writer-003 (Sponsor: carol@company.com)") + + # Create delegation chain + print("\nπŸ” Creating delegation chain...") + + delegation_id = trust_layer.create_delegation( + from_agent="orchestrator-001", + to_agent="researcher-002", + scope={ + "allowed_actions": ["web_search", "summarize"], + "allowed_domains": ["arxiv.org", "github.com"], + "max_tokens": 50000 + }, + task_description="Research recent papers on AI safety", + time_limit_minutes=30 + ) + print(f"βœ… Delegation: orchestrator-001 β†’ researcher-002") + print(f" ID: {delegation_id}") + print(f" Scope: web_search, summarize") + print(f" Time Limit: 30 minutes") + + # Create governed agents + researcher = GovernedAgent("researcher-002", trust_layer) + researcher.current_delegation = delegation_id + + writer = GovernedAgent("writer-003", trust_layer) + + # Test actions + print("\n" + "=" * 40) + print("πŸ§ͺ Testing Agent Actions") + print("=" * 40) + + # Test 1: Allowed action within delegation + print("\nπŸ€– researcher-002: web_search (within scope)") + result = researcher.execute("web_search", {"query": "AI safety papers 2024"}) + print(f" Result: {'βœ… ALLOWED' if result['success'] else '❌ DENIED'}") + print(f" Trust Score: {result['trust_score']}") + + # Test 2: Denied action outside delegation + print("\nπŸ€– researcher-002: send_email (outside scope)") + result = researcher.execute("send_email", {"to": "test@example.com"}) + print(f" Result: {'βœ… ALLOWED' if result['success'] else '❌ DENIED'}") + if not result['success']: + print(f" Reason: {result['error']}") + print(f" Trust Score: {result['trust_score']}") + + # Test 3: Writer tries action not in role + print("\nπŸ€– writer-003: web_search (not in role)") + result = writer.execute("web_search", {"query": "test"}) + print(f" Result: {'βœ… ALLOWED' if result['success'] else '❌ DENIED'}") + if not result['success']: + print(f" Reason: {result['error']}") + print(f" Trust Score: {result['trust_score']}") + + # Test 4: Writer does allowed action + print("\nπŸ€– writer-003: write_document (in role)") + result = writer.execute("write_document", {"content": "Report draft"}) + print(f" Result: {'βœ… ALLOWED' if result['success'] else '❌ DENIED'}") + print(f" Trust Score: {result['trust_score']}") + + # Show final trust scores + print("\n" + "=" * 40) + print("πŸ“Š Final Trust Scores") + print("=" * 40) + + for agent_id in ["orchestrator-001", "researcher-002", "writer-003"]: + score = trust_layer.get_trust_score(agent_id) + level = trust_layer.get_trust_level(agent_id) + print(f" {agent_id}: {score} ({level.value})") + + # Show audit log + print("\n" + "=" * 40) + print("πŸ“‹ Audit Log") + print("=" * 40) + + for entry in trust_layer.get_audit_log(): + status = "βœ…" if entry["result"] == "allowed" else "❌" + print(f" {status} {entry['agent_id']}: {entry['action']} - {entry['result']}") + + print("\nβœ… Demo complete!") + + +if __name__ == "__main__": + main() diff --git a/advanced_ai_agents/multi_agent_apps/multi_agent_trust_layer/requirements.txt b/advanced_ai_agents/multi_agent_apps/multi_agent_trust_layer/requirements.txt new file mode 100644 index 0000000..aa2b704 --- /dev/null +++ b/advanced_ai_agents/multi_agent_apps/multi_agent_trust_layer/requirements.txt @@ -0,0 +1 @@ +openai>=1.0.0 diff --git a/advanced_ai_agents/single_agent_apps/ai_agent_governance/README.md b/advanced_ai_agents/single_agent_apps/ai_agent_governance/README.md new file mode 100644 index 0000000..99b1fe0 --- /dev/null +++ b/advanced_ai_agents/single_agent_apps/ai_agent_governance/README.md @@ -0,0 +1,182 @@ +# πŸ›‘οΈ AI Agent Governance - Policy-Based Sandboxing + +Learn how to build a governance layer that enforces deterministic policies on AI agents, preventing dangerous actions before they execute. + +## Features + +- **Policy-Based Sandboxing**: Define what your AI agent can and cannot do using declarative policies +- **Action Interception**: Catch and validate agent actions before execution +- **Audit Logging**: Full trail of agent actions for compliance and debugging +- **File System Guards**: Restrict read/write to specific directories +- **Network Guards**: Allowlist-only external API access +- **Rate Limiting**: Prevent runaway agents with configurable limits + +## How It Works + +1. **Policy Definition**: Define your security policies in YAML format +2. **Action Wrapping**: Wrap your agent's tools with the governance layer +3. **Interception**: Before any tool executes, the policy engine validates the action +4. **Decision**: Actions are allowed, denied, or require human approval +5. **Audit**: All decisions are logged for compliance and debugging + +``` +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ Agent │────▢│ Governance │────▢│ Tool β”‚ +β”‚ (LLM) β”‚ β”‚ Layer β”‚ β”‚ Execution β”‚ +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + β”‚ + β”Œβ”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β” + β”‚ Policy β”‚ + β”‚ Engine β”‚ + β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +``` + +## Requirements + +- Python 3.8+ +- OpenAI API key (or any LLM provider) +- Required Python packages (see `requirements.txt`) + +## Installation + +1. Clone this repository: + ```bash + git clone https://github.com/Shubhamsaboo/awesome-llm-apps.git + cd advanced_ai_agents/single_agent_apps/ai_agent_governance + ``` + +2. Install the required packages: + ```bash + pip install -r requirements.txt + ``` + +## Usage + +1. Set your API key: + ```bash + export OPENAI_API_KEY=your-openai-api-key + ``` + +2. Run the governance demo: + ```bash + python ai_agent_governance.py + ``` + +3. Try different actions and see how the policy engine handles them. + +## Example Policy Configuration + +```yaml +policies: + filesystem: + allowed_paths: ["/workspace", "/tmp"] + denied_paths: ["/etc", "/home", "~/.ssh"] + + network: + allowed_domains: ["api.openai.com", "api.github.com"] + block_all_others: true + + execution: + max_actions_per_minute: 60 + require_approval_for: ["delete_file", "execute_shell"] + + tools: + allowed: ["read_file", "write_file", "web_search"] + denied: ["execute_code", "send_email"] +``` + +## Example Output + +``` +πŸ›‘οΈ AI Agent Governance Demo +============================ + +πŸ“‹ Loading policy: workspace_sandbox.yaml + +πŸ€– Agent request: "Read the contents of /etc/passwd" +❌ DENIED: Path '/etc/passwd' is outside allowed directories + +πŸ€– Agent request: "Write analysis to /workspace/report.md" +βœ… ALLOWED: Action permitted by policy + +πŸ€– Agent request: "Make HTTP request to unknown-api.com" +❌ DENIED: Domain 'unknown-api.com' not in allowlist + +πŸ€– Agent request: "Delete /workspace/temp.txt" +⏸️ PENDING: Action requires human approval + [Y/n]: +``` + +## Technical Details + +### Policy Engine + +The policy engine evaluates actions against a set of rules: + +```python +class PolicyEngine: + def evaluate(self, action: Action) -> Decision: + # Check each policy rule + for rule in self.rules: + result = rule.evaluate(action) + if result.is_terminal: + return result + return Decision.ALLOW +``` + +### Action Interception + +Tools are wrapped with governance checks: + +```python +def governed_tool(func): + def wrapper(*args, **kwargs): + action = Action(name=func.__name__, args=args, kwargs=kwargs) + decision = policy_engine.evaluate(action) + + if decision == Decision.DENY: + raise PolicyViolation(decision.reason) + elif decision == Decision.REQUIRE_APPROVAL: + if not get_human_approval(action): + raise PolicyViolation("Human denied the action") + + # Log the action + audit_log.record(action, decision) + + return func(*args, **kwargs) + return wrapper +``` + +### Audit Logging + +All actions are logged with full context: + +```python +{ + "timestamp": "2024-01-15T10:30:00Z", + "action": "write_file", + "args": {"path": "/workspace/report.md"}, + "decision": "ALLOW", + "policy_matched": "filesystem.allowed_paths", + "agent_id": "research-agent-001" +} +``` + +## Key Concepts Learned + +1. **Deterministic vs Probabilistic Safety**: Why policy enforcement is more reliable than prompt engineering +2. **Defense in Depth**: Multiple layers of validation for robust security +3. **Audit Trails**: Importance of logging for compliance and debugging +4. **Principle of Least Privilege**: Only grant the permissions agents actually need + +## Extending the Tutorial + +- Add custom policy rules for your use case +- Implement human-in-the-loop approval workflows +- Connect to external policy management systems +- Add real-time monitoring and alerting + +## Related Projects + +- [LangChain](https://github.com/langchain-ai/langchain) - LLM application framework +- [Guardrails AI](https://github.com/guardrails-ai/guardrails) - Input/output validation for LLMs diff --git a/advanced_ai_agents/single_agent_apps/ai_agent_governance/ai_agent_governance.py b/advanced_ai_agents/single_agent_apps/ai_agent_governance/ai_agent_governance.py new file mode 100644 index 0000000..c96be0a --- /dev/null +++ b/advanced_ai_agents/single_agent_apps/ai_agent_governance/ai_agent_governance.py @@ -0,0 +1,600 @@ +""" +πŸ›‘οΈ AI Agent Governance - Policy-Based Sandboxing Tutorial + +This tutorial demonstrates how to build a governance layer that enforces +deterministic policies on AI agents, preventing dangerous actions before execution. + +Key concepts: +- Policy-based action validation +- Tool interception and wrapping +- Audit logging for compliance +- Rate limiting and resource controls +""" + +import os +import json +import yaml +import logging +from datetime import datetime, timedelta +from typing import Dict, Any, List, Optional, Callable +from dataclasses import dataclass, field +from enum import Enum +from functools import wraps +from collections import defaultdict +import re + +from openai import OpenAI + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + + +# ============================================================================ +# CORE DATA STRUCTURES +# ============================================================================ + +class Decision(Enum): + """Policy evaluation decision""" + ALLOW = "allow" + DENY = "deny" + REQUIRE_APPROVAL = "require_approval" + + +@dataclass +class Action: + """Represents an action an agent wants to perform""" + name: str + args: tuple = field(default_factory=tuple) + kwargs: Dict[str, Any] = field(default_factory=dict) + timestamp: datetime = field(default_factory=datetime.utcnow) + agent_id: str = "default-agent" + + +@dataclass +class PolicyResult: + """Result of a policy evaluation""" + decision: Decision + reason: str + policy_name: str + is_terminal: bool = True + + +@dataclass +class AuditEntry: + """An entry in the audit log""" + timestamp: datetime + action: Action + decision: Decision + reason: str + policy_matched: str + + +# ============================================================================ +# POLICY ENGINE +# ============================================================================ + +class PolicyRule: + """Base class for policy rules""" + + def __init__(self, name: str): + self.name = name + + def evaluate(self, action: Action) -> Optional[PolicyResult]: + """Evaluate the action against this rule. Returns None if rule doesn't apply.""" + raise NotImplementedError + + +class FilesystemPolicy(PolicyRule): + """Policy for filesystem access control""" + + def __init__(self, allowed_paths: List[str], denied_paths: List[str]): + super().__init__("filesystem") + self.allowed_paths = [os.path.expanduser(p) for p in allowed_paths] + self.denied_paths = [os.path.expanduser(p) for p in denied_paths] + + def evaluate(self, action: Action) -> Optional[PolicyResult]: + # Check if action involves file paths + path = None + if action.kwargs.get("path"): + path = action.kwargs["path"] + elif action.kwargs.get("file_path"): + path = action.kwargs["file_path"] + elif action.args and isinstance(action.args[0], str) and "/" in action.args[0]: + path = action.args[0] + + if not path: + return None # Rule doesn't apply + + path = os.path.abspath(os.path.expanduser(path)) + + # Check denied paths first + for denied in self.denied_paths: + if path.startswith(os.path.abspath(denied)): + return PolicyResult( + decision=Decision.DENY, + reason=f"Path '{path}' matches denied pattern '{denied}'", + policy_name=self.name + ) + + # Check if path is in allowed paths + for allowed in self.allowed_paths: + if path.startswith(os.path.abspath(allowed)): + return PolicyResult( + decision=Decision.ALLOW, + reason=f"Path '{path}' is within allowed directory '{allowed}'", + policy_name=self.name + ) + + # Default deny if not explicitly allowed + return PolicyResult( + decision=Decision.DENY, + reason=f"Path '{path}' is outside allowed directories", + policy_name=self.name + ) + + +class NetworkPolicy(PolicyRule): + """Policy for network access control""" + + def __init__(self, allowed_domains: List[str], block_all_others: bool = True): + super().__init__("network") + self.allowed_domains = allowed_domains + self.block_all_others = block_all_others + + def evaluate(self, action: Action) -> Optional[PolicyResult]: + # Check if action involves URLs or domains + url = None + for key in ["url", "endpoint", "domain", "host"]: + if key in action.kwargs: + url = action.kwargs[key] + break + + if not url: + # Check args for URL patterns + for arg in action.args: + if isinstance(arg, str) and ("http://" in arg or "https://" in arg): + url = arg + break + + if not url: + return None # Rule doesn't apply + + # Extract domain from URL + domain_match = re.search(r"https?://([^/]+)", url) + if domain_match: + domain = domain_match.group(1) + else: + domain = url + + # Check if domain is allowed + for allowed in self.allowed_domains: + if domain == allowed or domain.endswith("." + allowed): + return PolicyResult( + decision=Decision.ALLOW, + reason=f"Domain '{domain}' is in allowlist", + policy_name=self.name + ) + + if self.block_all_others: + return PolicyResult( + decision=Decision.DENY, + reason=f"Domain '{domain}' not in allowlist", + policy_name=self.name + ) + + return None + + +class RateLimitPolicy(PolicyRule): + """Policy for rate limiting agent actions""" + + def __init__(self, max_actions_per_minute: int = 60): + super().__init__("rate_limit") + self.max_actions_per_minute = max_actions_per_minute + self.action_history: List[datetime] = [] + + def evaluate(self, action: Action) -> Optional[PolicyResult]: + now = datetime.utcnow() + cutoff = now - timedelta(minutes=1) + + # Clean old entries + self.action_history = [t for t in self.action_history if t > cutoff] + + if len(self.action_history) >= self.max_actions_per_minute: + return PolicyResult( + decision=Decision.DENY, + reason=f"Rate limit exceeded: {len(self.action_history)}/{self.max_actions_per_minute} actions per minute", + policy_name=self.name + ) + + self.action_history.append(now) + return None # Allow - doesn't block + + +class ApprovalRequiredPolicy(PolicyRule): + """Policy requiring human approval for certain actions""" + + def __init__(self, actions_requiring_approval: List[str]): + super().__init__("approval_required") + self.actions_requiring_approval = actions_requiring_approval + + def evaluate(self, action: Action) -> Optional[PolicyResult]: + if action.name in self.actions_requiring_approval: + return PolicyResult( + decision=Decision.REQUIRE_APPROVAL, + reason=f"Action '{action.name}' requires human approval", + policy_name=self.name + ) + return None + + +class PolicyEngine: + """Central policy evaluation engine""" + + def __init__(self): + self.rules: List[PolicyRule] = [] + self.audit_log: List[AuditEntry] = [] + + def add_rule(self, rule: PolicyRule): + """Add a policy rule to the engine""" + self.rules.append(rule) + + def evaluate(self, action: Action) -> PolicyResult: + """Evaluate an action against all policy rules""" + for rule in self.rules: + result = rule.evaluate(action) + if result and result.is_terminal: + self._log_audit(action, result) + return result + + # Default allow if no rule blocks + result = PolicyResult( + decision=Decision.ALLOW, + reason="No policy rule blocked this action", + policy_name="default" + ) + self._log_audit(action, result) + return result + + def _log_audit(self, action: Action, result: PolicyResult): + """Log action and decision to audit trail""" + entry = AuditEntry( + timestamp=datetime.utcnow(), + action=action, + decision=result.decision, + reason=result.reason, + policy_matched=result.policy_name + ) + self.audit_log.append(entry) + logger.info(f"AUDIT: {result.decision.value.upper()} - {action.name} - {result.reason}") + + def get_audit_log(self) -> List[Dict]: + """Get audit log as serializable dictionaries""" + return [ + { + "timestamp": entry.timestamp.isoformat(), + "action": entry.action.name, + "action_args": str(entry.action.kwargs), + "decision": entry.decision.value, + "reason": entry.reason, + "policy_matched": entry.policy_matched + } + for entry in self.audit_log + ] + + @classmethod + def from_yaml(cls, yaml_content: str) -> "PolicyEngine": + """Create a PolicyEngine from YAML configuration""" + config = yaml.safe_load(yaml_content) + engine = cls() + + policies = config.get("policies", {}) + + if "filesystem" in policies: + fs = policies["filesystem"] + engine.add_rule(FilesystemPolicy( + allowed_paths=fs.get("allowed_paths", []), + denied_paths=fs.get("denied_paths", []) + )) + + if "network" in policies: + net = policies["network"] + engine.add_rule(NetworkPolicy( + allowed_domains=net.get("allowed_domains", []), + block_all_others=net.get("block_all_others", True) + )) + + if "execution" in policies: + exe = policies["execution"] + if "max_actions_per_minute" in exe: + engine.add_rule(RateLimitPolicy(exe["max_actions_per_minute"])) + if "require_approval_for" in exe: + engine.add_rule(ApprovalRequiredPolicy(exe["require_approval_for"])) + + return engine + + +# ============================================================================ +# GOVERNANCE WRAPPER +# ============================================================================ + +class PolicyViolation(Exception): + """Raised when an action violates a policy""" + pass + + +def get_human_approval(action: Action) -> bool: + """Get human approval for an action (interactive prompt)""" + print(f"\n⏸️ APPROVAL REQUIRED") + print(f" Action: {action.name}") + print(f" Args: {action.kwargs}") + response = input(" Approve? [y/N]: ").strip().lower() + return response == "y" + + +def governed_tool(policy_engine: PolicyEngine, require_interactive_approval: bool = True): + """ + Decorator that wraps a tool function with governance checks. + + Args: + policy_engine: The PolicyEngine to use for evaluation + require_interactive_approval: If True, prompt for approval when needed + """ + def decorator(func: Callable) -> Callable: + @wraps(func) + def wrapper(*args, **kwargs): + # Create action representation + action = Action( + name=func.__name__, + args=args, + kwargs=kwargs + ) + + # Evaluate against policies + result = policy_engine.evaluate(action) + + if result.decision == Decision.DENY: + raise PolicyViolation(f"❌ DENIED: {result.reason}") + + elif result.decision == Decision.REQUIRE_APPROVAL: + if require_interactive_approval: + if not get_human_approval(action): + raise PolicyViolation("❌ DENIED: Human rejected the action") + else: + raise PolicyViolation(f"⏸️ PENDING: {result.reason}") + + # Action is allowed - execute + print(f"βœ… ALLOWED: {result.reason}") + return func(*args, **kwargs) + + return wrapper + return decorator + + +# ============================================================================ +# EXAMPLE TOOLS (to be governed) +# ============================================================================ + +def create_governed_tools(policy_engine: PolicyEngine) -> Dict[str, Callable]: + """Create a set of governed tools for the agent""" + + @governed_tool(policy_engine) + def read_file(path: str) -> str: + """Read contents of a file""" + with open(path, "r") as f: + return f.read() + + @governed_tool(policy_engine) + def write_file(path: str, content: str) -> str: + """Write content to a file""" + with open(path, "w") as f: + f.write(content) + return f"Successfully wrote {len(content)} characters to {path}" + + @governed_tool(policy_engine) + def delete_file(path: str) -> str: + """Delete a file""" + os.remove(path) + return f"Successfully deleted {path}" + + @governed_tool(policy_engine) + def web_request(url: str) -> str: + """Make a web request (simulated)""" + return f"Simulated response from {url}" + + @governed_tool(policy_engine) + def execute_shell(command: str) -> str: + """Execute a shell command (simulated for safety)""" + return f"Simulated execution of: {command}" + + return { + "read_file": read_file, + "write_file": write_file, + "delete_file": delete_file, + "web_request": web_request, + "execute_shell": execute_shell + } + + +# ============================================================================ +# DEMO: SIMPLE AGENT WITH GOVERNANCE +# ============================================================================ + +class GovernedAgent: + """A simple agent with governance layer""" + + def __init__(self, policy_engine: PolicyEngine): + self.policy_engine = policy_engine + self.tools = create_governed_tools(policy_engine) + self.client = OpenAI() + + def run(self, user_request: str) -> str: + """Process a user request with governance""" + + # Create system prompt with available tools + system_prompt = """You are a helpful assistant with access to the following tools: +- read_file(path): Read a file +- write_file(path, content): Write to a file +- delete_file(path): Delete a file +- web_request(url): Make a web request +- execute_shell(command): Execute a shell command + +Analyze the user's request and respond with the tool call you would make. +Format: TOOL: tool_name(arg1="value1", arg2="value2") + +If no tool is needed, just respond normally.""" + + # Get LLM response + response = self.client.chat.completions.create( + model="gpt-4", + messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_request} + ], + max_tokens=500 + ) + + llm_response = response.choices[0].message.content + print(f"\nπŸ€– Agent response: {llm_response}") + + # Parse and execute tool call if present + if "TOOL:" in llm_response: + tool_line = llm_response.split("TOOL:")[1].strip().split("\n")[0] + return self._execute_tool_call(tool_line) + + return llm_response + + def _execute_tool_call(self, tool_call: str) -> str: + """Parse and execute a tool call string""" + # Simple parser for tool_name(kwargs) + match = re.match(r"(\w+)\((.*)\)", tool_call) + if not match: + return f"Could not parse tool call: {tool_call}" + + tool_name = match.group(1) + args_str = match.group(2) + + # Parse kwargs + kwargs = {} + for arg in args_str.split(", "): + if "=" in arg: + key, value = arg.split("=", 1) + kwargs[key.strip()] = value.strip().strip('"\'') + + if tool_name not in self.tools: + return f"Unknown tool: {tool_name}" + + try: + result = self.tools[tool_name](**kwargs) + return f"Tool result: {result}" + except PolicyViolation as e: + return str(e) + except Exception as e: + return f"Tool error: {e}" + + +# ============================================================================ +# MAIN DEMO +# ============================================================================ + +def main(): + print("πŸ›‘οΈ AI Agent Governance Demo") + print("=" * 40) + + # Define policy configuration + policy_yaml = """ +policies: + filesystem: + allowed_paths: + - /workspace + - /tmp + denied_paths: + - /etc + - /home + - ~/.ssh + + network: + allowed_domains: + - api.openai.com + - api.github.com + block_all_others: true + + execution: + max_actions_per_minute: 60 + require_approval_for: + - delete_file + - execute_shell +""" + + print("\nπŸ“‹ Loading policy configuration...") + policy_engine = PolicyEngine.from_yaml(policy_yaml) + print("βœ… Policy engine initialized with rules:") + for rule in policy_engine.rules: + print(f" - {rule.name}") + + # Create governed tools + tools = create_governed_tools(policy_engine) + + # Demo: Test various actions + print("\n" + "=" * 40) + print("πŸ“ Testing Governance Layer") + print("=" * 40) + + test_cases = [ + ("read_file", {"path": "/etc/passwd"}), + ("write_file", {"path": "/workspace/report.md", "content": "# Analysis Report\n"}), + ("web_request", {"url": "https://api.github.com/users"}), + ("web_request", {"url": "https://unknown-site.com/api"}), + ("read_file", {"path": "/workspace/data.txt"}), + ] + + for tool_name, kwargs in test_cases: + print(f"\nπŸ€– Testing: {tool_name}({kwargs})") + try: + # We need to create a temp file for the read test + if tool_name == "read_file" and kwargs["path"] == "/workspace/data.txt": + os.makedirs("/workspace", exist_ok=True) + with open("/workspace/data.txt", "w") as f: + f.write("Test data") + + result = tools[tool_name](**kwargs) + print(f" Result: {result}") + except PolicyViolation as e: + print(f" {e}") + except Exception as e: + print(f" Error: {e}") + + # Print audit log + print("\n" + "=" * 40) + print("πŸ“Š Audit Log") + print("=" * 40) + for entry in policy_engine.get_audit_log(): + print(f" {entry['decision'].upper():8} | {entry['action']:15} | {entry['reason'][:50]}") + + # Demo with LLM agent (optional - requires API key) + if os.getenv("OPENAI_API_KEY"): + print("\n" + "=" * 40) + print("πŸ€– LLM Agent Demo (with governance)") + print("=" * 40) + + agent = GovernedAgent(policy_engine) + + demo_requests = [ + "Read the contents of /etc/passwd", + "Write a summary to /workspace/summary.md", + "Make a request to api.github.com to get user info" + ] + + for request in demo_requests: + print(f"\nπŸ‘€ User: {request}") + result = agent.run(request) + print(f"πŸ“€ Result: {result}") + else: + print("\nπŸ’‘ Set OPENAI_API_KEY to run the full LLM agent demo") + + print("\nβœ… Demo complete!") + + +if __name__ == "__main__": + main() diff --git a/advanced_ai_agents/single_agent_apps/ai_agent_governance/requirements.txt b/advanced_ai_agents/single_agent_apps/ai_agent_governance/requirements.txt new file mode 100644 index 0000000..a7597e6 --- /dev/null +++ b/advanced_ai_agents/single_agent_apps/ai_agent_governance/requirements.txt @@ -0,0 +1,2 @@ +openai>=1.0.0 +pyyaml>=6.0