Add AI agent governance tutorials

- Add AI Agent Governance tutorial: Policy-based sandboxing for single agents
  - Teaches deterministic policy enforcement concepts
  - Includes filesystem, network, and rate limit policies
  - Full working Python code with audit logging

- Add Multi-Agent Trust Layer tutorial: Secure agent-to-agent communication
  - Teaches trust scoring and behavioral monitoring
  - Includes delegation chains with scope narrowing
  - Full working Python code with audit trail
This commit is contained in:
Imran Siddique
2026-02-04 17:17:07 -08:00
parent 60b72c6479
commit 2fec0064a4
6 changed files with 1797 additions and 0 deletions

View File

@@ -0,0 +1,230 @@
# 🤝 Multi-Agent Trust Layer - Secure Agent-to-Agent Communication
Learn how to build a trust layer for multi-agent systems that enables secure delegation, trust scoring, and policy enforcement between AI agents.
## Features
- **Agent Identity**: Each agent has a verifiable identity with a human sponsor
- **Trust Scoring**: Behavioral monitoring with a 0-1000 trust score
- **Delegation Chains**: Cryptographically narrow scope when delegating tasks
- **Policy Enforcement**: Enforce compliance rules across agent interactions
- **Audit Trail**: Full observability of agent-to-agent communications
## How It Works
```
┌─────────────────┐ ┌─────────────────┐
│ Agent A │◀───────▶│ Trust Layer │
│ (Orchestrator) │ TLS │ │
└─────────────────┘ │ • Identity │
│ • Trust Score │
┌─────────────────┐ │ • Delegation │
│ Agent B │◀───────▶│ • Policy │
│ (Specialist) │ TLS │ • Audit │
└─────────────────┘ └─────────────────┘
```
1. **Registration**: Agents register with verified identity and human sponsor
2. **Trust Establishment**: Initial trust score based on sponsor reputation
3. **Delegation**: Parent agents can delegate tasks with narrowed permissions
4. **Monitoring**: All actions are tracked and trust scores updated
5. **Enforcement**: Policies determine what each agent can do
## Requirements
- Python 3.8+
- OpenAI API key (or any LLM provider)
- Required Python packages (see `requirements.txt`)
## Installation
1. Clone this repository:
```bash
git clone https://github.com/Shubhamsaboo/awesome-llm-apps.git
cd advanced_ai_agents/multi_agent_apps/multi_agent_trust_layer
```
2. Install the required packages:
```bash
pip install -r requirements.txt
```
## Usage
1. Set your API key:
```bash
export OPENAI_API_KEY=your-openai-api-key
```
2. Run the trust layer demo:
```bash
python multi_agent_trust_layer.py
```
3. Watch agents interact through the trust layer with full observability.
## Example: Agent Delegation Chain
```python
# Orchestrator agent creates a delegation for a specialist
delegation = trust_layer.create_delegation(
from_agent="orchestrator-001",
to_agent="researcher-002",
scope={
"allowed_actions": ["web_search", "summarize"],
"max_tokens": 10000,
"time_limit_minutes": 30,
"allowed_domains": ["arxiv.org", "github.com"]
},
task_description="Research recent papers on AI safety"
)
# Researcher can only perform actions within the delegated scope
result = researcher.execute_with_delegation(
delegation=delegation,
action="web_search",
params={"query": "AI safety papers 2024"}
)
```
## Trust Score System
Trust scores range from 0-1000:
| Score Range | Level | Permissions |
|-------------|-------|-------------|
| 900-1000 | Trusted | Full access within role |
| 700-899 | Standard | Normal operations |
| 500-699 | Probation | Limited actions, extra logging |
| 300-499 | Restricted | Human approval required |
| 0-299 | Suspended | No autonomous actions |
### Score Updates
```python
# Positive behaviors increase trust
+10: Successfully completed delegated task
+5: Stayed within scope boundaries
+2: Provided accurate information
# Negative behaviors decrease trust
-50: Attempted action outside scope
-30: Provided inaccurate information
-20: Exceeded resource limits
-100: Security violation
```
## Example Output
```
🤝 Multi-Agent Trust Layer Demo
================================
📋 Registering agents...
✅ Registered: orchestrator-001 (Human Sponsor: alice@company.com)
✅ Registered: researcher-002 (Human Sponsor: bob@company.com)
✅ Registered: writer-003 (Human Sponsor: carol@company.com)
🔐 Creating delegation chain...
✅ Delegation: orchestrator-001 → researcher-002
Scope: web_search, summarize
Time Limit: 30 minutes
🤖 Agent researcher-002 executing: web_search
Query: "AI safety papers 2024"
✅ Action ALLOWED (within delegated scope)
Trust Score: 850 → 860 (+10)
🤖 Agent researcher-002 executing: send_email
❌ Action DENIED (not in delegated scope)
Trust Score: 860 → 810 (-50)
📊 Trust Scores:
orchestrator-001: 900 (Trusted)
researcher-002: 810 (Standard)
writer-003: 850 (Standard)
```
## Key Concepts
### 1. Agent Identity
Every agent has a cryptographic identity tied to a human sponsor:
```python
@dataclass
class AgentIdentity:
agent_id: str
public_key: str
human_sponsor: str # Accountable human
organization: str
roles: List[str]
created_at: datetime
```
### 2. Delegation Chains
Delegations form a chain where each link can only narrow scope:
```python
@dataclass
class Delegation:
delegation_id: str
parent_agent: str
child_agent: str
scope: DelegationScope
signature: str # Signed by parent
parent_delegation: Optional[str] # Links to parent's delegation
```
### 3. Policy Enforcement
Policies define what agents can do based on trust and role:
```python
policies:
researcher:
base_trust_required: 500
allowed_actions:
- web_search
- read_document
- summarize
denied_actions:
- execute_code
- send_email
resource_limits:
max_tokens_per_hour: 100000
max_api_calls_per_minute: 60
```
## Architecture
```
┌────────────────────────────────────────────────────┐
│ Trust Layer │
├─────────────┬─────────────┬─────────────┬──────────┤
│ Identity │ Trust │ Delegation │ Policy │
│ Registry │ Scoring │ Manager │ Engine │
├─────────────┴─────────────┴─────────────┴──────────┤
│ Audit Logger │
└────────────────────────────────────────────────────┘
▲ ▲ ▲
│ │ │
┌────┴────┐ ┌────┴────┐ ┌────┴────┐
│ Agent A │ │ Agent B │ │ Agent C │
└─────────┘ └─────────┘ └─────────┘
```
## Extending the Tutorial
- Add cryptographic signatures for delegation verification
- Implement reputation systems across organizations
- Add real-time trust score visualization
- Connect to external identity providers (OAuth, SAML)
- Implement secure communication channels (mTLS)
## Related Projects
- [LangGraph](https://github.com/langchain-ai/langgraph) - Multi-agent orchestration
- [CrewAI](https://github.com/joaomdmoura/crewAI) - Multi-agent framework
- [AutoGen](https://github.com/microsoft/autogen) - Multi-agent conversations

View File

@@ -0,0 +1,782 @@
"""
🤝 Multi-Agent Trust Layer Tutorial
This tutorial demonstrates how to build a trust layer for multi-agent systems
that enables secure delegation, trust scoring, and policy enforcement.
Key concepts:
- Agent identity and registration
- Trust scoring based on behavior
- Delegation chains with scope narrowing
- Policy enforcement across agent interactions
- Full audit trail of agent communications
"""
import os
import json
import hashlib
import secrets
import logging
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional, Set
from dataclasses import dataclass, field
from enum import Enum
from collections import defaultdict
from openai import OpenAI
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# ============================================================================
# TRUST LEVELS
# ============================================================================
class TrustLevel(Enum):
"""Trust levels based on score ranges"""
SUSPENDED = "suspended" # 0-299
RESTRICTED = "restricted" # 300-499
PROBATION = "probation" # 500-699
STANDARD = "standard" # 700-899
TRUSTED = "trusted" # 900-1000
@classmethod
def from_score(cls, score: int) -> "TrustLevel":
if score >= 900:
return cls.TRUSTED
elif score >= 700:
return cls.STANDARD
elif score >= 500:
return cls.PROBATION
elif score >= 300:
return cls.RESTRICTED
else:
return cls.SUSPENDED
# ============================================================================
# CORE DATA STRUCTURES
# ============================================================================
@dataclass
class AgentIdentity:
"""Represents an agent's verified identity"""
agent_id: str
public_key: str
human_sponsor: str # Email of accountable human
organization: str
roles: List[str]
created_at: datetime = field(default_factory=datetime.utcnow)
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class TrustScore:
"""Agent's trust score with history"""
agent_id: str
score: int # 0-1000
level: TrustLevel
history: List[Dict[str, Any]] = field(default_factory=list)
last_updated: datetime = field(default_factory=datetime.utcnow)
def update(self, delta: int, reason: str):
"""Update trust score with bounds checking"""
old_score = self.score
self.score = max(0, min(1000, self.score + delta))
self.level = TrustLevel.from_score(self.score)
self.last_updated = datetime.utcnow()
self.history.append({
"timestamp": self.last_updated.isoformat(),
"old_score": old_score,
"new_score": self.score,
"delta": delta,
"reason": reason
})
@dataclass
class DelegationScope:
"""Defines what an agent can do under a delegation"""
allowed_actions: Set[str]
denied_actions: Set[str] = field(default_factory=set)
allowed_domains: Set[str] = field(default_factory=set)
max_tokens: int = 10000
time_limit_minutes: int = 60
max_sub_delegations: int = 0 # Can this agent delegate further?
custom_constraints: Dict[str, Any] = field(default_factory=dict)
def allows_action(self, action: str) -> bool:
"""Check if an action is allowed under this scope"""
if action in self.denied_actions:
return False
if not self.allowed_actions: # Empty means all allowed
return True
return action in self.allowed_actions
def narrow(self, child_scope: "DelegationScope") -> "DelegationScope":
"""Create a narrowed scope for sub-delegation"""
return DelegationScope(
allowed_actions=self.allowed_actions & child_scope.allowed_actions,
denied_actions=self.denied_actions | child_scope.denied_actions,
allowed_domains=self.allowed_domains & child_scope.allowed_domains if self.allowed_domains else child_scope.allowed_domains,
max_tokens=min(self.max_tokens, child_scope.max_tokens),
time_limit_minutes=min(self.time_limit_minutes, child_scope.time_limit_minutes),
max_sub_delegations=min(self.max_sub_delegations, child_scope.max_sub_delegations),
custom_constraints={**self.custom_constraints, **child_scope.custom_constraints}
)
@dataclass
class Delegation:
"""A delegation of authority from one agent to another"""
delegation_id: str
parent_agent: str
child_agent: str
scope: DelegationScope
task_description: str
created_at: datetime
expires_at: datetime
signature: str # Signed by parent agent
parent_delegation_id: Optional[str] = None # For chain tracking
tokens_used: int = 0
is_revoked: bool = False
def is_valid(self) -> bool:
"""Check if delegation is still valid"""
if self.is_revoked:
return False
if datetime.utcnow() > self.expires_at:
return False
if self.tokens_used >= self.scope.max_tokens:
return False
return True
@dataclass
class AuditEntry:
"""Audit log entry for agent interactions"""
timestamp: datetime
event_type: str
agent_id: str
action: str
delegation_id: Optional[str]
result: str # "allowed", "denied", "error"
details: Dict[str, Any]
trust_impact: int = 0
# ============================================================================
# IDENTITY REGISTRY
# ============================================================================
class IdentityRegistry:
"""Manages agent identities"""
def __init__(self):
self.identities: Dict[str, AgentIdentity] = {}
self.sponsor_to_agents: Dict[str, List[str]] = defaultdict(list)
def register(self, identity: AgentIdentity) -> bool:
"""Register a new agent identity"""
if identity.agent_id in self.identities:
logger.warning(f"Agent {identity.agent_id} already registered")
return False
self.identities[identity.agent_id] = identity
self.sponsor_to_agents[identity.human_sponsor].append(identity.agent_id)
logger.info(f"Registered agent: {identity.agent_id} (sponsor: {identity.human_sponsor})")
return True
def get(self, agent_id: str) -> Optional[AgentIdentity]:
"""Get agent identity"""
return self.identities.get(agent_id)
def revoke(self, agent_id: str, reason: str) -> bool:
"""Revoke an agent's identity"""
if agent_id in self.identities:
identity = self.identities.pop(agent_id)
self.sponsor_to_agents[identity.human_sponsor].remove(agent_id)
logger.warning(f"Revoked agent: {agent_id} - {reason}")
return True
return False
# ============================================================================
# TRUST SCORING ENGINE
# ============================================================================
class TrustScoringEngine:
"""Manages trust scores for agents"""
# Score adjustments for various events
SCORE_ADJUSTMENTS = {
"task_completed": +10,
"stayed_in_scope": +5,
"accurate_output": +2,
"scope_violation_attempt": -50,
"inaccurate_output": -30,
"resource_exceeded": -20,
"security_violation": -100,
"delegation_success": +15,
"delegation_failure": -25,
}
def __init__(self, initial_score: int = 700):
self.scores: Dict[str, TrustScore] = {}
self.initial_score = initial_score
def initialize(self, agent_id: str, initial_score: Optional[int] = None) -> TrustScore:
"""Initialize trust score for a new agent"""
score = initial_score or self.initial_score
trust_score = TrustScore(
agent_id=agent_id,
score=score,
level=TrustLevel.from_score(score)
)
self.scores[agent_id] = trust_score
return trust_score
def get(self, agent_id: str) -> Optional[TrustScore]:
"""Get agent's trust score"""
return self.scores.get(agent_id)
def record_event(self, agent_id: str, event_type: str, custom_delta: Optional[int] = None) -> int:
"""Record an event and update trust score"""
if agent_id not in self.scores:
self.initialize(agent_id)
delta = custom_delta if custom_delta is not None else self.SCORE_ADJUSTMENTS.get(event_type, 0)
self.scores[agent_id].update(delta, event_type)
logger.info(f"Trust update: {agent_id} {delta:+d} ({event_type}) → {self.scores[agent_id].score}")
return delta
# ============================================================================
# DELEGATION MANAGER
# ============================================================================
class DelegationManager:
"""Manages delegation chains between agents"""
def __init__(self, identity_registry: IdentityRegistry, trust_engine: TrustScoringEngine):
self.delegations: Dict[str, Delegation] = {}
self.agent_delegations: Dict[str, List[str]] = defaultdict(list) # agent_id -> [delegation_ids]
self.identity_registry = identity_registry
self.trust_engine = trust_engine
def create_delegation(
self,
parent_agent: str,
child_agent: str,
scope: DelegationScope,
task_description: str,
time_limit_minutes: Optional[int] = None,
parent_delegation_id: Optional[str] = None
) -> Optional[Delegation]:
"""Create a new delegation from parent to child agent"""
# Verify both agents exist
if not self.identity_registry.get(parent_agent):
logger.error(f"Parent agent not found: {parent_agent}")
return None
if not self.identity_registry.get(child_agent):
logger.error(f"Child agent not found: {child_agent}")
return None
# Check parent's trust level
parent_trust = self.trust_engine.get(parent_agent)
if parent_trust and parent_trust.level == TrustLevel.SUSPENDED:
logger.error(f"Suspended agent cannot delegate: {parent_agent}")
return None
# If this is a sub-delegation, narrow the scope
if parent_delegation_id:
parent_del = self.delegations.get(parent_delegation_id)
if not parent_del or not parent_del.is_valid():
logger.error(f"Invalid parent delegation: {parent_delegation_id}")
return None
if parent_del.scope.max_sub_delegations <= 0:
logger.error(f"No sub-delegations allowed under: {parent_delegation_id}")
return None
scope = parent_del.scope.narrow(scope)
# Create delegation
delegation_id = f"del-{secrets.token_hex(8)}"
time_limit = time_limit_minutes or scope.time_limit_minutes
delegation = Delegation(
delegation_id=delegation_id,
parent_agent=parent_agent,
child_agent=child_agent,
scope=scope,
task_description=task_description,
created_at=datetime.utcnow(),
expires_at=datetime.utcnow() + timedelta(minutes=time_limit),
signature=self._sign_delegation(parent_agent, delegation_id),
parent_delegation_id=parent_delegation_id
)
self.delegations[delegation_id] = delegation
self.agent_delegations[child_agent].append(delegation_id)
logger.info(f"Created delegation: {parent_agent}{child_agent} ({delegation_id})")
return delegation
def _sign_delegation(self, agent_id: str, delegation_id: str) -> str:
"""Create a signature for a delegation (simplified - real impl would use crypto)"""
data = f"{agent_id}:{delegation_id}:{datetime.utcnow().isoformat()}"
return hashlib.sha256(data.encode()).hexdigest()[:16]
def validate_action(self, agent_id: str, action: str, delegation_id: Optional[str] = None) -> bool:
"""Validate if an agent can perform an action under their delegation"""
if delegation_id:
delegation = self.delegations.get(delegation_id)
if not delegation:
return False
if not delegation.is_valid():
return False
if delegation.child_agent != agent_id:
return False
return delegation.scope.allows_action(action)
# Check if agent has any valid delegation allowing this action
for del_id in self.agent_delegations.get(agent_id, []):
delegation = self.delegations.get(del_id)
if delegation and delegation.is_valid() and delegation.scope.allows_action(action):
return True
return False
def revoke_delegation(self, delegation_id: str, reason: str) -> bool:
"""Revoke a delegation"""
if delegation_id in self.delegations:
self.delegations[delegation_id].is_revoked = True
logger.warning(f"Revoked delegation: {delegation_id} - {reason}")
return True
return False
def get_active_delegations(self, agent_id: str) -> List[Delegation]:
"""Get all active delegations for an agent"""
return [
self.delegations[del_id]
for del_id in self.agent_delegations.get(agent_id, [])
if self.delegations[del_id].is_valid()
]
# ============================================================================
# POLICY ENGINE
# ============================================================================
class MultiAgentPolicyEngine:
"""Enforces policies for multi-agent interactions"""
def __init__(self, trust_engine: TrustScoringEngine, delegation_manager: DelegationManager):
self.trust_engine = trust_engine
self.delegation_manager = delegation_manager
self.role_policies: Dict[str, Dict[str, Any]] = {}
def add_role_policy(self, role: str, policy: Dict[str, Any]):
"""Add a policy for a specific role"""
self.role_policies[role] = policy
def evaluate(
self,
agent_id: str,
action: str,
roles: List[str],
delegation_id: Optional[str] = None
) -> tuple[bool, str]:
"""Evaluate if an action is allowed"""
# Check trust level
trust = self.trust_engine.get(agent_id)
if not trust:
return False, "Agent has no trust score"
if trust.level == TrustLevel.SUSPENDED:
return False, "Agent is suspended"
# Check role-based policies
for role in roles:
policy = self.role_policies.get(role, {})
# Check base trust requirement
min_trust = policy.get("base_trust_required", 0)
if trust.score < min_trust:
return False, f"Trust score {trust.score} below minimum {min_trust} for role {role}"
# Check denied actions
if action in policy.get("denied_actions", []):
return False, f"Action '{action}' denied for role {role}"
# Check allowed actions (if specified, action must be in list)
allowed = policy.get("allowed_actions", [])
if allowed and action not in allowed:
return False, f"Action '{action}' not in allowed list for role {role}"
# Check delegation
if delegation_id:
if not self.delegation_manager.validate_action(agent_id, action, delegation_id):
return False, f"Action '{action}' not allowed under delegation {delegation_id}"
# Require approval for restricted agents
if trust.level == TrustLevel.RESTRICTED:
return False, "Agent is restricted - requires human approval"
return True, "Action allowed"
# ============================================================================
# TRUST LAYER (MAIN INTERFACE)
# ============================================================================
class TrustLayer:
"""Main interface for the multi-agent trust layer"""
def __init__(self):
self.identity_registry = IdentityRegistry()
self.trust_engine = TrustScoringEngine()
self.delegation_manager = DelegationManager(self.identity_registry, self.trust_engine)
self.policy_engine = MultiAgentPolicyEngine(self.trust_engine, self.delegation_manager)
self.audit_log: List[AuditEntry] = []
def register_agent(
self,
agent_id: str,
human_sponsor: str,
organization: str,
roles: List[str],
initial_trust: int = 700
) -> bool:
"""Register a new agent with the trust layer"""
# Generate a key pair (simplified)
public_key = hashlib.sha256(f"{agent_id}:{secrets.token_hex(16)}".encode()).hexdigest()
identity = AgentIdentity(
agent_id=agent_id,
public_key=public_key,
human_sponsor=human_sponsor,
organization=organization,
roles=roles
)
if self.identity_registry.register(identity):
self.trust_engine.initialize(agent_id, initial_trust)
return True
return False
def create_delegation(
self,
from_agent: str,
to_agent: str,
scope: Dict[str, Any],
task_description: str,
time_limit_minutes: int = 60
) -> Optional[str]:
"""Create a delegation from one agent to another"""
delegation_scope = DelegationScope(
allowed_actions=set(scope.get("allowed_actions", [])),
denied_actions=set(scope.get("denied_actions", [])),
allowed_domains=set(scope.get("allowed_domains", [])),
max_tokens=scope.get("max_tokens", 10000),
time_limit_minutes=time_limit_minutes,
max_sub_delegations=scope.get("max_sub_delegations", 0)
)
delegation = self.delegation_manager.create_delegation(
parent_agent=from_agent,
child_agent=to_agent,
scope=delegation_scope,
task_description=task_description,
time_limit_minutes=time_limit_minutes
)
return delegation.delegation_id if delegation else None
def authorize_action(
self,
agent_id: str,
action: str,
delegation_id: Optional[str] = None
) -> tuple[bool, str]:
"""Authorize an action for an agent"""
identity = self.identity_registry.get(agent_id)
if not identity:
self._log_audit(agent_id, action, "denied", delegation_id, {"reason": "Unknown agent"})
return False, "Unknown agent"
allowed, reason = self.policy_engine.evaluate(
agent_id=agent_id,
action=action,
roles=identity.roles,
delegation_id=delegation_id
)
# Update trust score based on result
if allowed:
self.trust_engine.record_event(agent_id, "stayed_in_scope")
else:
self.trust_engine.record_event(agent_id, "scope_violation_attempt")
self._log_audit(
agent_id, action,
"allowed" if allowed else "denied",
delegation_id,
{"reason": reason}
)
return allowed, reason
def record_task_result(self, agent_id: str, delegation_id: str, success: bool):
"""Record the result of a delegated task"""
if success:
self.trust_engine.record_event(agent_id, "task_completed")
self.trust_engine.record_event(
self.delegation_manager.delegations[delegation_id].parent_agent,
"delegation_success"
)
else:
self.trust_engine.record_event(agent_id, "delegation_failure")
def get_trust_score(self, agent_id: str) -> Optional[int]:
"""Get an agent's trust score"""
trust = self.trust_engine.get(agent_id)
return trust.score if trust else None
def get_trust_level(self, agent_id: str) -> Optional[TrustLevel]:
"""Get an agent's trust level"""
trust = self.trust_engine.get(agent_id)
return trust.level if trust else None
def _log_audit(
self,
agent_id: str,
action: str,
result: str,
delegation_id: Optional[str],
details: Dict[str, Any]
):
"""Log an audit entry"""
entry = AuditEntry(
timestamp=datetime.utcnow(),
event_type="action_authorization",
agent_id=agent_id,
action=action,
delegation_id=delegation_id,
result=result,
details=details
)
self.audit_log.append(entry)
def get_audit_log(self, agent_id: Optional[str] = None) -> List[Dict]:
"""Get audit log, optionally filtered by agent"""
entries = self.audit_log
if agent_id:
entries = [e for e in entries if e.agent_id == agent_id]
return [
{
"timestamp": e.timestamp.isoformat(),
"event_type": e.event_type,
"agent_id": e.agent_id,
"action": e.action,
"delegation_id": e.delegation_id,
"result": e.result,
"details": e.details
}
for e in entries
]
# ============================================================================
# EXAMPLE: GOVERNED MULTI-AGENT SYSTEM
# ============================================================================
class GovernedAgent:
"""An agent that operates within the trust layer"""
def __init__(self, agent_id: str, trust_layer: TrustLayer):
self.agent_id = agent_id
self.trust_layer = trust_layer
self.current_delegation: Optional[str] = None
def execute(self, action: str, params: Dict[str, Any]) -> Dict[str, Any]:
"""Execute an action through the trust layer"""
allowed, reason = self.trust_layer.authorize_action(
self.agent_id,
action,
self.current_delegation
)
if not allowed:
return {
"success": False,
"error": f"Action denied: {reason}",
"trust_score": self.trust_layer.get_trust_score(self.agent_id)
}
# Simulate action execution
result = self._execute_action(action, params)
return {
"success": True,
"result": result,
"trust_score": self.trust_layer.get_trust_score(self.agent_id)
}
def _execute_action(self, action: str, params: Dict[str, Any]) -> str:
"""Simulate executing an action"""
return f"Executed {action} with params {params}"
# ============================================================================
# MAIN DEMO
# ============================================================================
def main():
print("🤝 Multi-Agent Trust Layer Demo")
print("=" * 40)
# Initialize trust layer
trust_layer = TrustLayer()
# Add role policies
trust_layer.policy_engine.add_role_policy("researcher", {
"base_trust_required": 500,
"allowed_actions": ["web_search", "read_document", "summarize", "analyze"],
"denied_actions": ["execute_code", "send_email", "delete_file"]
})
trust_layer.policy_engine.add_role_policy("writer", {
"base_trust_required": 600,
"allowed_actions": ["write_document", "edit_document", "summarize"],
"denied_actions": ["execute_code", "web_search"]
})
trust_layer.policy_engine.add_role_policy("orchestrator", {
"base_trust_required": 800,
"allowed_actions": [], # Can do anything not explicitly denied
"denied_actions": ["delete_system_files"]
})
# Register agents
print("\n📋 Registering agents...")
trust_layer.register_agent(
agent_id="orchestrator-001",
human_sponsor="alice@company.com",
organization="Acme Corp",
roles=["orchestrator"],
initial_trust=900
)
print("✅ Registered: orchestrator-001 (Sponsor: alice@company.com)")
trust_layer.register_agent(
agent_id="researcher-002",
human_sponsor="bob@company.com",
organization="Acme Corp",
roles=["researcher"],
initial_trust=750
)
print("✅ Registered: researcher-002 (Sponsor: bob@company.com)")
trust_layer.register_agent(
agent_id="writer-003",
human_sponsor="carol@company.com",
organization="Acme Corp",
roles=["writer"],
initial_trust=700
)
print("✅ Registered: writer-003 (Sponsor: carol@company.com)")
# Create delegation chain
print("\n🔐 Creating delegation chain...")
delegation_id = trust_layer.create_delegation(
from_agent="orchestrator-001",
to_agent="researcher-002",
scope={
"allowed_actions": ["web_search", "summarize"],
"allowed_domains": ["arxiv.org", "github.com"],
"max_tokens": 50000
},
task_description="Research recent papers on AI safety",
time_limit_minutes=30
)
print(f"✅ Delegation: orchestrator-001 → researcher-002")
print(f" ID: {delegation_id}")
print(f" Scope: web_search, summarize")
print(f" Time Limit: 30 minutes")
# Create governed agents
researcher = GovernedAgent("researcher-002", trust_layer)
researcher.current_delegation = delegation_id
writer = GovernedAgent("writer-003", trust_layer)
# Test actions
print("\n" + "=" * 40)
print("🧪 Testing Agent Actions")
print("=" * 40)
# Test 1: Allowed action within delegation
print("\n🤖 researcher-002: web_search (within scope)")
result = researcher.execute("web_search", {"query": "AI safety papers 2024"})
print(f" Result: {'✅ ALLOWED' if result['success'] else '❌ DENIED'}")
print(f" Trust Score: {result['trust_score']}")
# Test 2: Denied action outside delegation
print("\n🤖 researcher-002: send_email (outside scope)")
result = researcher.execute("send_email", {"to": "test@example.com"})
print(f" Result: {'✅ ALLOWED' if result['success'] else '❌ DENIED'}")
if not result['success']:
print(f" Reason: {result['error']}")
print(f" Trust Score: {result['trust_score']}")
# Test 3: Writer tries action not in role
print("\n🤖 writer-003: web_search (not in role)")
result = writer.execute("web_search", {"query": "test"})
print(f" Result: {'✅ ALLOWED' if result['success'] else '❌ DENIED'}")
if not result['success']:
print(f" Reason: {result['error']}")
print(f" Trust Score: {result['trust_score']}")
# Test 4: Writer does allowed action
print("\n🤖 writer-003: write_document (in role)")
result = writer.execute("write_document", {"content": "Report draft"})
print(f" Result: {'✅ ALLOWED' if result['success'] else '❌ DENIED'}")
print(f" Trust Score: {result['trust_score']}")
# Show final trust scores
print("\n" + "=" * 40)
print("📊 Final Trust Scores")
print("=" * 40)
for agent_id in ["orchestrator-001", "researcher-002", "writer-003"]:
score = trust_layer.get_trust_score(agent_id)
level = trust_layer.get_trust_level(agent_id)
print(f" {agent_id}: {score} ({level.value})")
# Show audit log
print("\n" + "=" * 40)
print("📋 Audit Log")
print("=" * 40)
for entry in trust_layer.get_audit_log():
status = "" if entry["result"] == "allowed" else ""
print(f" {status} {entry['agent_id']}: {entry['action']} - {entry['result']}")
print("\n✅ Demo complete!")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1 @@
openai>=1.0.0

View File

@@ -0,0 +1,182 @@
# 🛡️ AI Agent Governance - Policy-Based Sandboxing
Learn how to build a governance layer that enforces deterministic policies on AI agents, preventing dangerous actions before they execute.
## Features
- **Policy-Based Sandboxing**: Define what your AI agent can and cannot do using declarative policies
- **Action Interception**: Catch and validate agent actions before execution
- **Audit Logging**: Full trail of agent actions for compliance and debugging
- **File System Guards**: Restrict read/write to specific directories
- **Network Guards**: Allowlist-only external API access
- **Rate Limiting**: Prevent runaway agents with configurable limits
## How It Works
1. **Policy Definition**: Define your security policies in YAML format
2. **Action Wrapping**: Wrap your agent's tools with the governance layer
3. **Interception**: Before any tool executes, the policy engine validates the action
4. **Decision**: Actions are allowed, denied, or require human approval
5. **Audit**: All decisions are logged for compliance and debugging
```
┌─────────────┐ ┌──────────────┐ ┌─────────────┐
│ Agent │────▶│ Governance │────▶│ Tool │
│ (LLM) │ │ Layer │ │ Execution │
└─────────────┘ └──────────────┘ └─────────────┘
┌──────▼──────┐
│ Policy │
│ Engine │
└─────────────┘
```
## Requirements
- Python 3.8+
- OpenAI API key (or any LLM provider)
- Required Python packages (see `requirements.txt`)
## Installation
1. Clone this repository:
```bash
git clone https://github.com/Shubhamsaboo/awesome-llm-apps.git
cd advanced_ai_agents/single_agent_apps/ai_agent_governance
```
2. Install the required packages:
```bash
pip install -r requirements.txt
```
## Usage
1. Set your API key:
```bash
export OPENAI_API_KEY=your-openai-api-key
```
2. Run the governance demo:
```bash
python ai_agent_governance.py
```
3. Try different actions and see how the policy engine handles them.
## Example Policy Configuration
```yaml
policies:
filesystem:
allowed_paths: ["/workspace", "/tmp"]
denied_paths: ["/etc", "/home", "~/.ssh"]
network:
allowed_domains: ["api.openai.com", "api.github.com"]
block_all_others: true
execution:
max_actions_per_minute: 60
require_approval_for: ["delete_file", "execute_shell"]
tools:
allowed: ["read_file", "write_file", "web_search"]
denied: ["execute_code", "send_email"]
```
## Example Output
```
🛡️ AI Agent Governance Demo
============================
📋 Loading policy: workspace_sandbox.yaml
🤖 Agent request: "Read the contents of /etc/passwd"
❌ DENIED: Path '/etc/passwd' is outside allowed directories
🤖 Agent request: "Write analysis to /workspace/report.md"
✅ ALLOWED: Action permitted by policy
🤖 Agent request: "Make HTTP request to unknown-api.com"
❌ DENIED: Domain 'unknown-api.com' not in allowlist
🤖 Agent request: "Delete /workspace/temp.txt"
⏸️ PENDING: Action requires human approval
[Y/n]:
```
## Technical Details
### Policy Engine
The policy engine evaluates actions against a set of rules:
```python
class PolicyEngine:
def evaluate(self, action: Action) -> Decision:
# Check each policy rule
for rule in self.rules:
result = rule.evaluate(action)
if result.is_terminal:
return result
return Decision.ALLOW
```
### Action Interception
Tools are wrapped with governance checks:
```python
def governed_tool(func):
def wrapper(*args, **kwargs):
action = Action(name=func.__name__, args=args, kwargs=kwargs)
decision = policy_engine.evaluate(action)
if decision == Decision.DENY:
raise PolicyViolation(decision.reason)
elif decision == Decision.REQUIRE_APPROVAL:
if not get_human_approval(action):
raise PolicyViolation("Human denied the action")
# Log the action
audit_log.record(action, decision)
return func(*args, **kwargs)
return wrapper
```
### Audit Logging
All actions are logged with full context:
```python
{
"timestamp": "2024-01-15T10:30:00Z",
"action": "write_file",
"args": {"path": "/workspace/report.md"},
"decision": "ALLOW",
"policy_matched": "filesystem.allowed_paths",
"agent_id": "research-agent-001"
}
```
## Key Concepts Learned
1. **Deterministic vs Probabilistic Safety**: Why policy enforcement is more reliable than prompt engineering
2. **Defense in Depth**: Multiple layers of validation for robust security
3. **Audit Trails**: Importance of logging for compliance and debugging
4. **Principle of Least Privilege**: Only grant the permissions agents actually need
## Extending the Tutorial
- Add custom policy rules for your use case
- Implement human-in-the-loop approval workflows
- Connect to external policy management systems
- Add real-time monitoring and alerting
## Related Projects
- [LangChain](https://github.com/langchain-ai/langchain) - LLM application framework
- [Guardrails AI](https://github.com/guardrails-ai/guardrails) - Input/output validation for LLMs

View File

@@ -0,0 +1,600 @@
"""
🛡️ AI Agent Governance - Policy-Based Sandboxing Tutorial
This tutorial demonstrates how to build a governance layer that enforces
deterministic policies on AI agents, preventing dangerous actions before execution.
Key concepts:
- Policy-based action validation
- Tool interception and wrapping
- Audit logging for compliance
- Rate limiting and resource controls
"""
import os
import json
import yaml
import logging
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional, Callable
from dataclasses import dataclass, field
from enum import Enum
from functools import wraps
from collections import defaultdict
import re
from openai import OpenAI
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# ============================================================================
# CORE DATA STRUCTURES
# ============================================================================
class Decision(Enum):
"""Policy evaluation decision"""
ALLOW = "allow"
DENY = "deny"
REQUIRE_APPROVAL = "require_approval"
@dataclass
class Action:
"""Represents an action an agent wants to perform"""
name: str
args: tuple = field(default_factory=tuple)
kwargs: Dict[str, Any] = field(default_factory=dict)
timestamp: datetime = field(default_factory=datetime.utcnow)
agent_id: str = "default-agent"
@dataclass
class PolicyResult:
"""Result of a policy evaluation"""
decision: Decision
reason: str
policy_name: str
is_terminal: bool = True
@dataclass
class AuditEntry:
"""An entry in the audit log"""
timestamp: datetime
action: Action
decision: Decision
reason: str
policy_matched: str
# ============================================================================
# POLICY ENGINE
# ============================================================================
class PolicyRule:
"""Base class for policy rules"""
def __init__(self, name: str):
self.name = name
def evaluate(self, action: Action) -> Optional[PolicyResult]:
"""Evaluate the action against this rule. Returns None if rule doesn't apply."""
raise NotImplementedError
class FilesystemPolicy(PolicyRule):
"""Policy for filesystem access control"""
def __init__(self, allowed_paths: List[str], denied_paths: List[str]):
super().__init__("filesystem")
self.allowed_paths = [os.path.expanduser(p) for p in allowed_paths]
self.denied_paths = [os.path.expanduser(p) for p in denied_paths]
def evaluate(self, action: Action) -> Optional[PolicyResult]:
# Check if action involves file paths
path = None
if action.kwargs.get("path"):
path = action.kwargs["path"]
elif action.kwargs.get("file_path"):
path = action.kwargs["file_path"]
elif action.args and isinstance(action.args[0], str) and "/" in action.args[0]:
path = action.args[0]
if not path:
return None # Rule doesn't apply
path = os.path.abspath(os.path.expanduser(path))
# Check denied paths first
for denied in self.denied_paths:
if path.startswith(os.path.abspath(denied)):
return PolicyResult(
decision=Decision.DENY,
reason=f"Path '{path}' matches denied pattern '{denied}'",
policy_name=self.name
)
# Check if path is in allowed paths
for allowed in self.allowed_paths:
if path.startswith(os.path.abspath(allowed)):
return PolicyResult(
decision=Decision.ALLOW,
reason=f"Path '{path}' is within allowed directory '{allowed}'",
policy_name=self.name
)
# Default deny if not explicitly allowed
return PolicyResult(
decision=Decision.DENY,
reason=f"Path '{path}' is outside allowed directories",
policy_name=self.name
)
class NetworkPolicy(PolicyRule):
"""Policy for network access control"""
def __init__(self, allowed_domains: List[str], block_all_others: bool = True):
super().__init__("network")
self.allowed_domains = allowed_domains
self.block_all_others = block_all_others
def evaluate(self, action: Action) -> Optional[PolicyResult]:
# Check if action involves URLs or domains
url = None
for key in ["url", "endpoint", "domain", "host"]:
if key in action.kwargs:
url = action.kwargs[key]
break
if not url:
# Check args for URL patterns
for arg in action.args:
if isinstance(arg, str) and ("http://" in arg or "https://" in arg):
url = arg
break
if not url:
return None # Rule doesn't apply
# Extract domain from URL
domain_match = re.search(r"https?://([^/]+)", url)
if domain_match:
domain = domain_match.group(1)
else:
domain = url
# Check if domain is allowed
for allowed in self.allowed_domains:
if domain == allowed or domain.endswith("." + allowed):
return PolicyResult(
decision=Decision.ALLOW,
reason=f"Domain '{domain}' is in allowlist",
policy_name=self.name
)
if self.block_all_others:
return PolicyResult(
decision=Decision.DENY,
reason=f"Domain '{domain}' not in allowlist",
policy_name=self.name
)
return None
class RateLimitPolicy(PolicyRule):
"""Policy for rate limiting agent actions"""
def __init__(self, max_actions_per_minute: int = 60):
super().__init__("rate_limit")
self.max_actions_per_minute = max_actions_per_minute
self.action_history: List[datetime] = []
def evaluate(self, action: Action) -> Optional[PolicyResult]:
now = datetime.utcnow()
cutoff = now - timedelta(minutes=1)
# Clean old entries
self.action_history = [t for t in self.action_history if t > cutoff]
if len(self.action_history) >= self.max_actions_per_minute:
return PolicyResult(
decision=Decision.DENY,
reason=f"Rate limit exceeded: {len(self.action_history)}/{self.max_actions_per_minute} actions per minute",
policy_name=self.name
)
self.action_history.append(now)
return None # Allow - doesn't block
class ApprovalRequiredPolicy(PolicyRule):
"""Policy requiring human approval for certain actions"""
def __init__(self, actions_requiring_approval: List[str]):
super().__init__("approval_required")
self.actions_requiring_approval = actions_requiring_approval
def evaluate(self, action: Action) -> Optional[PolicyResult]:
if action.name in self.actions_requiring_approval:
return PolicyResult(
decision=Decision.REQUIRE_APPROVAL,
reason=f"Action '{action.name}' requires human approval",
policy_name=self.name
)
return None
class PolicyEngine:
"""Central policy evaluation engine"""
def __init__(self):
self.rules: List[PolicyRule] = []
self.audit_log: List[AuditEntry] = []
def add_rule(self, rule: PolicyRule):
"""Add a policy rule to the engine"""
self.rules.append(rule)
def evaluate(self, action: Action) -> PolicyResult:
"""Evaluate an action against all policy rules"""
for rule in self.rules:
result = rule.evaluate(action)
if result and result.is_terminal:
self._log_audit(action, result)
return result
# Default allow if no rule blocks
result = PolicyResult(
decision=Decision.ALLOW,
reason="No policy rule blocked this action",
policy_name="default"
)
self._log_audit(action, result)
return result
def _log_audit(self, action: Action, result: PolicyResult):
"""Log action and decision to audit trail"""
entry = AuditEntry(
timestamp=datetime.utcnow(),
action=action,
decision=result.decision,
reason=result.reason,
policy_matched=result.policy_name
)
self.audit_log.append(entry)
logger.info(f"AUDIT: {result.decision.value.upper()} - {action.name} - {result.reason}")
def get_audit_log(self) -> List[Dict]:
"""Get audit log as serializable dictionaries"""
return [
{
"timestamp": entry.timestamp.isoformat(),
"action": entry.action.name,
"action_args": str(entry.action.kwargs),
"decision": entry.decision.value,
"reason": entry.reason,
"policy_matched": entry.policy_matched
}
for entry in self.audit_log
]
@classmethod
def from_yaml(cls, yaml_content: str) -> "PolicyEngine":
"""Create a PolicyEngine from YAML configuration"""
config = yaml.safe_load(yaml_content)
engine = cls()
policies = config.get("policies", {})
if "filesystem" in policies:
fs = policies["filesystem"]
engine.add_rule(FilesystemPolicy(
allowed_paths=fs.get("allowed_paths", []),
denied_paths=fs.get("denied_paths", [])
))
if "network" in policies:
net = policies["network"]
engine.add_rule(NetworkPolicy(
allowed_domains=net.get("allowed_domains", []),
block_all_others=net.get("block_all_others", True)
))
if "execution" in policies:
exe = policies["execution"]
if "max_actions_per_minute" in exe:
engine.add_rule(RateLimitPolicy(exe["max_actions_per_minute"]))
if "require_approval_for" in exe:
engine.add_rule(ApprovalRequiredPolicy(exe["require_approval_for"]))
return engine
# ============================================================================
# GOVERNANCE WRAPPER
# ============================================================================
class PolicyViolation(Exception):
"""Raised when an action violates a policy"""
pass
def get_human_approval(action: Action) -> bool:
"""Get human approval for an action (interactive prompt)"""
print(f"\n⏸️ APPROVAL REQUIRED")
print(f" Action: {action.name}")
print(f" Args: {action.kwargs}")
response = input(" Approve? [y/N]: ").strip().lower()
return response == "y"
def governed_tool(policy_engine: PolicyEngine, require_interactive_approval: bool = True):
"""
Decorator that wraps a tool function with governance checks.
Args:
policy_engine: The PolicyEngine to use for evaluation
require_interactive_approval: If True, prompt for approval when needed
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
# Create action representation
action = Action(
name=func.__name__,
args=args,
kwargs=kwargs
)
# Evaluate against policies
result = policy_engine.evaluate(action)
if result.decision == Decision.DENY:
raise PolicyViolation(f"❌ DENIED: {result.reason}")
elif result.decision == Decision.REQUIRE_APPROVAL:
if require_interactive_approval:
if not get_human_approval(action):
raise PolicyViolation("❌ DENIED: Human rejected the action")
else:
raise PolicyViolation(f"⏸️ PENDING: {result.reason}")
# Action is allowed - execute
print(f"✅ ALLOWED: {result.reason}")
return func(*args, **kwargs)
return wrapper
return decorator
# ============================================================================
# EXAMPLE TOOLS (to be governed)
# ============================================================================
def create_governed_tools(policy_engine: PolicyEngine) -> Dict[str, Callable]:
"""Create a set of governed tools for the agent"""
@governed_tool(policy_engine)
def read_file(path: str) -> str:
"""Read contents of a file"""
with open(path, "r") as f:
return f.read()
@governed_tool(policy_engine)
def write_file(path: str, content: str) -> str:
"""Write content to a file"""
with open(path, "w") as f:
f.write(content)
return f"Successfully wrote {len(content)} characters to {path}"
@governed_tool(policy_engine)
def delete_file(path: str) -> str:
"""Delete a file"""
os.remove(path)
return f"Successfully deleted {path}"
@governed_tool(policy_engine)
def web_request(url: str) -> str:
"""Make a web request (simulated)"""
return f"Simulated response from {url}"
@governed_tool(policy_engine)
def execute_shell(command: str) -> str:
"""Execute a shell command (simulated for safety)"""
return f"Simulated execution of: {command}"
return {
"read_file": read_file,
"write_file": write_file,
"delete_file": delete_file,
"web_request": web_request,
"execute_shell": execute_shell
}
# ============================================================================
# DEMO: SIMPLE AGENT WITH GOVERNANCE
# ============================================================================
class GovernedAgent:
"""A simple agent with governance layer"""
def __init__(self, policy_engine: PolicyEngine):
self.policy_engine = policy_engine
self.tools = create_governed_tools(policy_engine)
self.client = OpenAI()
def run(self, user_request: str) -> str:
"""Process a user request with governance"""
# Create system prompt with available tools
system_prompt = """You are a helpful assistant with access to the following tools:
- read_file(path): Read a file
- write_file(path, content): Write to a file
- delete_file(path): Delete a file
- web_request(url): Make a web request
- execute_shell(command): Execute a shell command
Analyze the user's request and respond with the tool call you would make.
Format: TOOL: tool_name(arg1="value1", arg2="value2")
If no tool is needed, just respond normally."""
# Get LLM response
response = self.client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_request}
],
max_tokens=500
)
llm_response = response.choices[0].message.content
print(f"\n🤖 Agent response: {llm_response}")
# Parse and execute tool call if present
if "TOOL:" in llm_response:
tool_line = llm_response.split("TOOL:")[1].strip().split("\n")[0]
return self._execute_tool_call(tool_line)
return llm_response
def _execute_tool_call(self, tool_call: str) -> str:
"""Parse and execute a tool call string"""
# Simple parser for tool_name(kwargs)
match = re.match(r"(\w+)\((.*)\)", tool_call)
if not match:
return f"Could not parse tool call: {tool_call}"
tool_name = match.group(1)
args_str = match.group(2)
# Parse kwargs
kwargs = {}
for arg in args_str.split(", "):
if "=" in arg:
key, value = arg.split("=", 1)
kwargs[key.strip()] = value.strip().strip('"\'')
if tool_name not in self.tools:
return f"Unknown tool: {tool_name}"
try:
result = self.tools[tool_name](**kwargs)
return f"Tool result: {result}"
except PolicyViolation as e:
return str(e)
except Exception as e:
return f"Tool error: {e}"
# ============================================================================
# MAIN DEMO
# ============================================================================
def main():
print("🛡️ AI Agent Governance Demo")
print("=" * 40)
# Define policy configuration
policy_yaml = """
policies:
filesystem:
allowed_paths:
- /workspace
- /tmp
denied_paths:
- /etc
- /home
- ~/.ssh
network:
allowed_domains:
- api.openai.com
- api.github.com
block_all_others: true
execution:
max_actions_per_minute: 60
require_approval_for:
- delete_file
- execute_shell
"""
print("\n📋 Loading policy configuration...")
policy_engine = PolicyEngine.from_yaml(policy_yaml)
print("✅ Policy engine initialized with rules:")
for rule in policy_engine.rules:
print(f" - {rule.name}")
# Create governed tools
tools = create_governed_tools(policy_engine)
# Demo: Test various actions
print("\n" + "=" * 40)
print("📝 Testing Governance Layer")
print("=" * 40)
test_cases = [
("read_file", {"path": "/etc/passwd"}),
("write_file", {"path": "/workspace/report.md", "content": "# Analysis Report\n"}),
("web_request", {"url": "https://api.github.com/users"}),
("web_request", {"url": "https://unknown-site.com/api"}),
("read_file", {"path": "/workspace/data.txt"}),
]
for tool_name, kwargs in test_cases:
print(f"\n🤖 Testing: {tool_name}({kwargs})")
try:
# We need to create a temp file for the read test
if tool_name == "read_file" and kwargs["path"] == "/workspace/data.txt":
os.makedirs("/workspace", exist_ok=True)
with open("/workspace/data.txt", "w") as f:
f.write("Test data")
result = tools[tool_name](**kwargs)
print(f" Result: {result}")
except PolicyViolation as e:
print(f" {e}")
except Exception as e:
print(f" Error: {e}")
# Print audit log
print("\n" + "=" * 40)
print("📊 Audit Log")
print("=" * 40)
for entry in policy_engine.get_audit_log():
print(f" {entry['decision'].upper():8} | {entry['action']:15} | {entry['reason'][:50]}")
# Demo with LLM agent (optional - requires API key)
if os.getenv("OPENAI_API_KEY"):
print("\n" + "=" * 40)
print("🤖 LLM Agent Demo (with governance)")
print("=" * 40)
agent = GovernedAgent(policy_engine)
demo_requests = [
"Read the contents of /etc/passwd",
"Write a summary to /workspace/summary.md",
"Make a request to api.github.com to get user info"
]
for request in demo_requests:
print(f"\n👤 User: {request}")
result = agent.run(request)
print(f"📤 Result: {result}")
else:
print("\n💡 Set OPENAI_API_KEY to run the full LLM agent demo")
print("\n✅ Demo complete!")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,2 @@
openai>=1.0.0
pyyaml>=6.0