addition of pydantic

This commit is contained in:
Madhu
2025-01-30 01:05:58 +05:30
parent b6d2a8ab05
commit acd4e36963
2 changed files with 266 additions and 172 deletions

View File

@@ -8,6 +8,8 @@ from dotenv import load_dotenv
from pydantic import BaseModel, Field
from enum import Enum
import json
from phi.agent import Agent, RunResponse
from phi.model.anthropic import Claude
# Model Constants
DEEPSEEK_MODEL: str = "deepseek-reasoner"
@@ -16,151 +18,71 @@ CLAUDE_MODEL: str = "claude-3-5-sonnet-20241022"
# Load environment variables
load_dotenv()
system_prompt = """You are a Senior Software Expert and Technical Documentation Assistant. Your role is to analyze the structured JSON response from DeepSeek, which contains architectural and technical recommendations across various domains, along with the original user query describing the software system they want to build.
The input consists of:
- The user's original query describing their software requirements
- A structured JSON response containing recommendations for architecture, security, infrastructure, compliance and other technical domains
For each key-value pair in the JSON:
1. Present the key and its corresponding value in a readable report format
2. Format the information in a clear, organized way
3. Do not add your own opinions or suggestions
4. Do not modify or reinterpret the provided information
Keep your responses factual and directly based on the JSON content provided."""
class ArchitecturePattern(str, Enum):
MICROSERVICES = "microservices"
MONOLITHIC = "monolithic"
SERVERLESS = "serverless"
EVENT_DRIVEN = "event_driven"
LAYERED = "layered"
class SecurityLevel(str, Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
VERY_HIGH = "very_high"
class ScalabilityRequirement(str, Enum):
SMALL = "small"
MEDIUM = "medium"
LARGE = "large"
ENTERPRISE = "enterprise"
"""Architectural patterns for system design."""
MICROSERVICES = "microservices" # Decomposed into small, independent services
MONOLITHIC = "monolithic" # Single, unified codebase
SERVERLESS = "serverless" # Function-as-a-Service architecture
EVENT_DRIVEN = "event_driven" # Asynchronous event-based communication
class DatabaseType(str, Enum):
SQL = "sql"
NOSQL = "nosql"
GRAPH = "graph"
TIME_SERIES = "time_series"
HYBRID = "hybrid"
"""Types of database systems."""
SQL = "sql" # Relational databases with ACID properties
NOSQL = "nosql" # Non-relational databases for flexible schemas
HYBRID = "hybrid" # Combined SQL and NoSQL approach
class ComplianceStandard(str, Enum):
HIPAA = "hipaa"
GDPR = "gdpr"
SOC2 = "soc2"
HITECH = "hitech"
ISO27001 = "iso27001"
PCI_DSS = "pci_dss"
class DataClassification(str, Enum):
PHI = "protected_health_information"
PII = "personally_identifiable_information"
CONFIDENTIAL = "confidential"
PUBLIC = "public"
class IntegrationType(str, Enum):
HL7 = "hl7"
FHIR = "fhir"
DICOM = "dicom"
REST = "rest"
SOAP = "soap"
CUSTOM = "custom"
class DataProcessingType(str, Enum):
REAL_TIME = "real_time"
BATCH = "batch"
HYBRID = "hybrid"
class MLCapability(BaseModel):
"""Defines machine learning capabilities and requirements"""
model_type: str = Field(..., description="Type of ML model (e.g., diagnostic, predictive, monitoring)")
training_frequency: str = Field(..., description="How often the model needs retraining")
input_data_types: List[str] = Field(..., description="Types of data the model processes")
performance_requirements: Dict[str, float] = Field(..., description="Required metrics like accuracy, latency")
hardware_requirements: Dict[str, str] = Field(..., description="GPU/CPU/Memory requirements")
regulatory_constraints: List[str] = Field(..., description="Regulatory requirements for ML models")
class SecurityMeasure(BaseModel):
"""Enhanced security measures for healthcare systems"""
measure_type: str
implementation_priority: int = Field(ge=1, le=5, description="Priority level for implementation")
compliance_standards: List[ComplianceStandard]
estimated_setup_time_days: int
data_classification: DataClassification
encryption_requirements: Dict[str, str] = Field(..., description="Encryption requirements for different states")
access_control_policy: Dict[str, List[str]] = Field(..., description="Role-based access control definitions")
audit_requirements: List[str] = Field(..., description="Audit logging requirements")
"""Regulatory compliance standards."""
HIPAA = "hipaa" # Healthcare data protection
GDPR = "gdpr" # EU data privacy regulation
SOC2 = "soc2" # Service organization security controls
ISO27001 = "iso27001" # Information security management
class ArchitectureDecision(BaseModel):
"""Architecture decision details"""
"""Represents architectural decisions and their justifications."""
pattern: ArchitecturePattern
rationale: str
trade_offs: Dict[str, List[str]]
estimated_cost: Dict[str, float]
rationale: str = Field(..., min_length=50) # Detailed explanation for the choice
trade_offs: Dict[str, List[str]] = Field(..., alias="trade_offs") # Pros and cons
estimated_cost: Dict[str, float] # Cost breakdown
class SecurityMeasure(BaseModel):
"""Security controls and implementation details."""
measure_type: str # Type of security measure
implementation_priority: int = Field(..., ge=1, le=5) # Priority level 1-5
compliance_standards: List[ComplianceStandard] # Applicable standards
data_classification: str # Data sensitivity level
class InfrastructureResource(BaseModel):
"""Infrastructure resource requirements"""
resource_type: str
specifications: Dict[str, str]
scaling_policy: Dict[str, Any]
estimated_cost: float
"""Infrastructure components and specifications."""
resource_type: str # Type of infrastructure resource
specifications: Dict[str, str] # Technical specifications
scaling_policy: Dict[str, str] # Scaling rules and thresholds
estimated_cost: float # Estimated cost per resource
class DataIntegration(BaseModel):
"""Data integration specifications"""
integration_type: IntegrationType
data_format: str
frequency: str
volume: str
security_requirements: Dict[str, str]
class TechnicalAnalysis(BaseModel):
"""Complete technical analysis of the system architecture."""
architecture_decision: ArchitectureDecision # Core architecture choices
infrastructure_resources: List[InfrastructureResource] # Required resources
security_measures: List[SecurityMeasure] # Security controls
database_choice: DatabaseType # Database architecture
compliance_requirements: List[ComplianceStandard] = [] # Required standards
performance_requirements: List[Dict[str, Union[str, float]]] = [] # Performance metrics
risk_assessment: Dict[str, str] = {} # Identified risks and mitigations
class PerformanceRequirement(BaseModel):
"""Performance requirements specification"""
metric_name: str
target_value: float
measurement_unit: str
priority: int
class AuditConfig(BaseModel):
"""Audit configuration settings"""
log_retention_period: int
audit_events: List[str]
compliance_mapping: Dict[str, List[str]]
class APIConfig(BaseModel):
"""API configuration settings"""
version: str
auth_method: str
rate_limits: Dict[str, int]
documentation_url: str
class ErrorHandlingConfig(BaseModel):
"""Error handling configuration"""
retry_policy: Dict[str, Any]
fallback_strategies: List[str]
notification_channels: List[str]
class ProjectAnalysis(BaseModel):
"""Enhanced project analysis for healthcare systems"""
architecture_decision: ArchitectureDecision
infrastructure_resources: List[InfrastructureResource]
security_measures: List[SecurityMeasure]
database_choice: DatabaseType
estimated_team_size: int
critical_path_components: List[str]
risk_assessment: Dict[str, str]
maintenance_considerations: List[str]
# Healthcare-specific fields
compliance_requirements: List[ComplianceStandard]
data_integrations: List[DataIntegration]
ml_capabilities: List[MLCapability]
performance_requirements: List[PerformanceRequirement]
data_retention_policy: Dict[str, str]
disaster_recovery: Dict[str, Any]
interoperability_standards: List[str]
# New fields
audit_config: AuditConfig
api_config: APIConfig
error_handling: ErrorHandlingConfig
class ModelChain:
def __init__(self, deepseek_api_key: str, anthropic_api_key: str) -> None:
@@ -169,15 +91,22 @@ class ModelChain:
base_url="https://api.deepseek.com"
)
self.claude_client = anthropic.Anthropic(api_key=anthropic_api_key)
self.agent = Agent(
model=Claude(id="claude-3-5-sonnet-20241022", api_key=anthropic_api_key),
system_prompt=system_prompt,
markdown=True
)
self.deepseek_messages: List[Dict[str, str]] = []
self.claude_messages: List[Dict[str, Any]] = []
self.current_model: str = CLAUDE_MODEL
def get_deepseek_reasoning(self, user_input: str) -> str:
def get_deepseek_reasoning(self, user_input: str) -> tuple[str, str]:
start_time = time.time()
system_prompt = """You are an expert software architect and technical advisor. Analyze the user's project requirements
and provide structured reasoning about architecture, tools, and implementation strategies.
IMPORTANT: Reason why you are choosing a particular architecture pattern, database type, etc. for user understanding in your reasoning.
IMPORTANT: Your response must be a valid JSON object (not a string or any other format) that matches the schema provided below.
Do not include any explanatory text, markdown formatting, or code blocks - only return the JSON object.
@@ -282,53 +211,34 @@ class ModelChain:
time_str = f"{elapsed_time/60:.1f} minutes" if elapsed_time >= 60 else f"{elapsed_time:.1f} seconds"
st.caption(f"⏱️ Analysis completed in {time_str}")
# Return the validated structured output for Claude
return reasoning_content
# Return both reasoning and normal content
return reasoning_content, normal_content
except Exception as e:
st.error(f"Error in DeepSeek analysis: {str(e)}")
return "Error occurred while analyzing"
def get_claude_response(self, user_input: str, reasoning: str) -> str:
system_prompt = """You are a senior software architect and implementation advisor. Using the provided technical analysis,
give detailed, actionable advice for implementing the solution. Include code snippets, configuration examples, and
step-by-step implementation guidelines where appropriate. Focus on practical implementation details while maintaining
best practices and addressing potential challenges."""
user_message = {
"role": "user",
"content": [{"type": "text", "text": user_input}]
}
assistant_prefill = {
"role": "assistant",
"content": [{"type": "text", "text": f"<thinking>{reasoning}</thinking>"}]
}
messages = [assistant_prefill]
return "Error occurred while analyzing", ""
def get_claude_response(self, user_input: str, deepseek_output: tuple[str, str]) -> str:
try:
reasoning_content, normal_content = deepseek_output
# Create expander for Claude's response
with st.expander("🤖 Claude's Response", expanded=True):
response_placeholder = st.empty()
with self.claude_client.messages.stream(
model=self.current_model,
system=system_prompt,
messages=messages,
max_tokens=8000
) as stream:
full_response = ""
for text in stream.text_stream:
full_response += text
response_placeholder.markdown(full_response)
# Prepare the message with user input, reasoning and normal output
message = f"""User Query: {user_input}
self.claude_messages.extend([user_message, {
"role": "assistant",
"content": [{"type": "text", "text": full_response}]
}])
DeepSeek Reasoning: {reasoning_content}
return full_response
DeepSeek Technical Analysis: {normal_content}"""
# Use Phi Agent to get response
response: RunResponse = self.agent.run(
message=message
)
return response.content
except Exception as e:
st.error(f"Error in Claude response: {str(e)}")
@@ -374,11 +284,11 @@ def main() -> None:
# Get AI response
with st.chat_message("assistant"):
with st.spinner("🤔 Thinking..."):
reasoning = chain.get_deepseek_reasoning(prompt)
deepseek_output = chain.get_deepseek_reasoning(prompt)
with st.spinner("✍️ Responding..."):
response = chain.get_claude_response(prompt, reasoning)
response = chain.get_claude_response(prompt, deepseek_output)
st.session_state.messages.append({"role": "assistant", "content": response})
if __name__ == "__main__":

View File

@@ -0,0 +1,184 @@
from enum import Enum
from typing import List, Dict, Union
from pydantic import BaseModel, Field, ValidationError
import streamlit as st
from openai import OpenAI
import anthropic
import json
import re
import os
from dotenv import load_dotenv
from phi.agent import Agent, RunResponse
from phi.model.anthropic import Claude
load_dotenv()
# --------------------------
# Enums & Data Models
# --------------------------
class ArchitecturePattern(str, Enum):
MICROSERVICES = "microservices"
MONOLITHIC = "monolithic"
SERVERLESS = "serverless"
EVENT_DRIVEN = "event_driven"
class DatabaseType(str, Enum):
SQL = "sql"
NOSQL = "nosql"
HYBRID = "hybrid"
class ComplianceStandard(str, Enum):
HIPAA = "hipaa"
GDPR = "gdpr"
SOC2 = "soc2"
ISO27001 = "iso27001"
class ArchitectureDecision(BaseModel):
pattern: ArchitecturePattern
rationale: str = Field(..., min_length=50)
trade_offs: Dict[str, List[str]] = Field(..., alias="trade_offs")
estimated_cost: Dict[str, float]
class SecurityMeasure(BaseModel):
measure_type: str
implementation_priority: int = Field(..., ge=1, le=5)
compliance_standards: List[ComplianceStandard]
data_classification: str
class InfrastructureResource(BaseModel):
resource_type: str
specifications: Dict[str, str]
scaling_policy: Dict[str, str]
estimated_cost: float
class TechnicalAnalysis(BaseModel):
architecture_decision: ArchitectureDecision
infrastructure_resources: List[InfrastructureResource]
security_measures: List[SecurityMeasure]
database_choice: DatabaseType
compliance_requirements: List[ComplianceStandard] = []
performance_requirements: List[Dict[str, Union[str, float]]] = []
risk_assessment: Dict[str, str] = {}
# --------------------------
# Core Implementation
# --------------------------
class ArchitectureAnalyzer:
def __init__(self, deepseek_api_key: str, anthropic_api_key: str):
self.deepseek_client = OpenAI(
api_key=deepseek_api_key,
base_url="https://api.deepseek.com"
)
self.claude_agent = Agent(
model=Claude(
id="claude-3-5-sonnet-20241022",
api_key=anthropic_api_key
),
markdown=True,
)
self.reasoning_content = ""
self.deepseek_prompt = f"""Analyze software requirements and return JSON with:
{{
"architecture_decision": {{
"pattern": "{'|'.join([e.value for e in ArchitecturePattern])}",
"rationale": "technical justification",
"trade_offs": {{"pros": [], "cons": []}},
"estimated_cost": {{"development": float, "maintenance": float}}
}},
"infrastructure_resources": [{{"resource_type": "...", "specifications": {{}}, ...}}],
"security_measures": [{{"measure_type": "...", "priority": 1-5, ...}}],
"database_choice": "{'|'.join([e.value for e in DatabaseType])}",
"compliance_requirements": ["..."],
"performance_requirements": [{{"metric": "...", "target": float}}]
}}"""
def _extract_json(self, text: str) -> dict:
try:
json_str = re.search(r'\{.*\}', text, re.DOTALL).group()
return json.loads(json_str)
except (AttributeError, json.JSONDecodeError) as e:
st.error(f"JSON extraction failed: {str(e)}")
st.text("Raw response:\n" + text)
raise
def analyze_requirements(self, user_input: str) -> TechnicalAnalysis:
try:
response1 = self.deepseek_client.chat.completions.create(
model="deepseek-reasoner",
messages=[
{"role": "system", "content": self.deepseek_prompt},
{"role": "user", "content": user_input}
],
temperature=0.2,
max_tokens=2000
)
self.reasoning_content = response1.choices[0].message.reasoning_content
json_data = self._extract_json(response1.choices[0].message.content)
return TechnicalAnalysis(**json_data)
except ValidationError as e:
st.error(f"Validation error: {e.errors()}")
st.json(json_data)
raise
def generate_report(self, analysis: TechnicalAnalysis) -> str:
report_prompt = f"""Convert this technical analysis into a executive report:
{analysis.model_dump_json(indent=2)}
Use markdown with:
# Title
## Sections
- Bullet points
**Bold important items**
Tables for cost/performance"""
response = self.claude_agent.run(report_prompt)
return response.content
# --------------------------
# Streamlit UI
# --------------------------
def main():
st.title("🏗️ AI Architecture Advisor")
with st.sidebar:
st.header("🔑 Setup")
deepseek_api_key = st.text_input("DeepSeek Key", type="password")
anthropic_api_key = st.text_input("Claude Key", type="password")
if "analysis" not in st.session_state:
st.session_state.analysis = None
if prompt := st.chat_input("Describe your system requirements:"):
if not all([deepseek_api_key, anthropic_api_key]):
st.error("Missing API keys")
return
analyzer = ArchitectureAnalyzer(deepseek_api_key, anthropic_api_key)
with st.status("🔨 Processing...", expanded=True):
try:
# Analysis Phase
st.write("🧠 Analyzing requirements...")
analysis = analyzer.analyze_requirements(prompt)
st.session_state.analysis = analysis
with st.expander("reasoning"):
st.markdown(analyzer.reasoning_content)
# Reporting Phase
st.write("📊 Generating report...")
report = analyzer.generate_report(analysis)
# Display Results
st.success("Analysis complete!")
st.markdown(report)
with st.expander("📁 Raw Analysis Data"):
st.json(analysis.model_dump_json())
except Exception as e:
st.error(f"Processing failed: {str(e)}")
if __name__ == "__main__":
main()