diff --git a/modules/source/15_mlops/mlops_dev.py b/modules/source/15_mlops/mlops_dev.py index 95d14264..34857f7b 100644 --- a/modules/source/15_mlops/mlops_dev.py +++ b/modules/source/15_mlops/mlops_dev.py @@ -1574,54 +1574,1234 @@ def test_module_mlops_tinytorch_integration(): # Run the integration test test_module_mlops_tinytorch_integration() +# %% [markdown] +""" +## Step 5: Production MLOps Profiler - Enterprise-Grade MLOps Framework + +### The Challenge: Enterprise MLOps Requirements +Real production systems need more than basic monitoring: +- **Model versioning and lineage**: Track every model iteration and its ancestry +- **Continuous training pipelines**: Automated, scalable training workflows +- **Feature drift detection**: Advanced statistical analysis of input features +- **Model monitoring and alerting**: Comprehensive health and performance tracking +- **Deployment orchestration**: Canary deployments, blue-green deployments +- **Rollback capabilities**: Safe model rollbacks when issues occur +- **Production incident response**: Automated incident detection and response + +### The Enterprise Solution: Production MLOps Profiler +A comprehensive MLOps framework that handles enterprise requirements: +- **Complete model lifecycle**: From development to retirement +- **Production-grade monitoring**: Multi-dimensional tracking and alerting +- **Automated deployment patterns**: Safe deployment strategies +- **Incident response**: Automated detection and recovery +- **Compliance and governance**: Audit trails and model explainability + +### What We'll Build +A `ProductionMLOpsProfiler` that provides: +1. **Model versioning and lineage tracking** for complete audit trails +2. **Continuous training pipelines** with automated scheduling +3. **Advanced feature drift detection** using multiple statistical tests +4. **Comprehensive monitoring** with multi-level alerting +5. **Deployment orchestration** with safe rollout patterns +6. **Production incident response** with automated recovery + +### Real-World Enterprise Applications +- **Financial services**: Regulatory compliance and model governance +- **Healthcare**: FDA-compliant model tracking and validation +- **Autonomous vehicles**: Safety-critical model deployment +- **E-commerce**: High-availability recommendation systems +""" + +# %% nbgrader={"grade": false, "grade_id": "production-mlops-profiler", "locked": false, "schema_version": 3, "solution": true, "task": false} +#| export +@dataclass +class ModelVersion: + """Represents a specific version of a model with metadata.""" + version_id: str + model_name: str + created_at: datetime + training_data_hash: str + performance_metrics: Dict[str, float] + parent_version: Optional[str] = None + tags: Dict[str, str] = field(default_factory=dict) + deployment_config: Dict[str, Any] = field(default_factory=dict) + +@dataclass +class DeploymentStrategy: + """Defines deployment strategy and rollout configuration.""" + strategy_type: str # 'canary', 'blue_green', 'rolling' + traffic_split: Dict[str, float] # {'current': 0.9, 'new': 0.1} + success_criteria: Dict[str, float] + rollback_criteria: Dict[str, float] + monitoring_window: int # seconds + +class ProductionMLOpsProfiler: + """ + Enterprise-grade MLOps profiler for production ML systems. + + Provides comprehensive model lifecycle management, deployment orchestration, + monitoring, and incident response capabilities. + """ + + def __init__(self, system_name: str, production_config: Optional[Dict] = None): + """ + TODO: Initialize the Production MLOps Profiler. + + STEP-BY-STEP IMPLEMENTATION: + 1. Store system configuration: + - system_name: Unique identifier for this MLOps system + - production_config: Enterprise configuration settings + 2. Initialize model registry: + - model_versions: Dict[str, List[ModelVersion]] (model_name -> versions) + - active_deployments: Dict[str, ModelVersion] (deployment_id -> version) + - deployment_history: List[Dict] for audit trails + 3. Set up monitoring infrastructure: + - feature_monitors: Dict[str, Any] for feature drift tracking + - performance_monitors: Dict[str, Any] for model performance + - alert_channels: List[str] for notification endpoints + 4. Initialize deployment orchestration: + - deployment_strategies: Dict[str, DeploymentStrategy] + - rollback_policies: Dict[str, Any] + - traffic_routing: Dict[str, float] + 5. Set up incident response: + - incident_log: List[Dict] for tracking issues + - auto_recovery_policies: Dict[str, Any] + - escalation_rules: List[Dict] + + EXAMPLE USAGE: + ```python + config = { + "monitoring_interval": 300, # 5 minutes + "alert_thresholds": {"accuracy": 0.85, "latency": 500}, + "auto_rollback": True + } + profiler = ProductionMLOpsProfiler("recommendation_system", config) + ``` + + IMPLEMENTATION HINTS: + - Use defaultdict for automatic initialization + - Set reasonable defaults for production_config + - Initialize all tracking dictionaries + - Set up enterprise-grade monitoring defaults + """ + ### BEGIN SOLUTION + self.system_name = system_name + self.production_config = production_config or { + "monitoring_interval": 300, # 5 minutes + "alert_thresholds": {"accuracy": 0.85, "latency": 500, "error_rate": 0.05}, + "auto_rollback": True, + "deployment_timeout": 1800, # 30 minutes + "feature_drift_sensitivity": 0.01, # 1% significance level + "incident_escalation_timeout": 900 # 15 minutes + } + + # Model registry + self.model_versions = defaultdict(list) + self.active_deployments = {} + self.deployment_history = [] + + # Monitoring infrastructure + self.feature_monitors = {} + self.performance_monitors = {} + self.alert_channels = ["email", "slack", "pagerduty"] + + # Deployment orchestration + self.deployment_strategies = { + "canary": DeploymentStrategy( + strategy_type="canary", + traffic_split={"current": 0.95, "new": 0.05}, + success_criteria={"accuracy": 0.90, "latency": 400, "error_rate": 0.02}, + rollback_criteria={"accuracy": 0.85, "latency": 600, "error_rate": 0.10}, + monitoring_window=1800 + ), + "blue_green": DeploymentStrategy( + strategy_type="blue_green", + traffic_split={"current": 1.0, "new": 0.0}, + success_criteria={"accuracy": 0.92, "latency": 350, "error_rate": 0.01}, + rollback_criteria={"accuracy": 0.87, "latency": 500, "error_rate": 0.05}, + monitoring_window=3600 + ) + } + self.rollback_policies = { + "auto_rollback_enabled": True, + "rollback_threshold_breaches": 3, + "rollback_confirmation_required": False + } + self.traffic_routing = {} + + # Incident response + self.incident_log = [] + self.auto_recovery_policies = { + "restart_on_error": True, + "scale_on_load": True, + "rollback_on_failure": True + } + self.escalation_rules = [ + {"level": 1, "timeout": 300, "contacts": ["on_call_engineer"]}, + {"level": 2, "timeout": 900, "contacts": ["ml_team_lead", "devops_team"]}, + {"level": 3, "timeout": 1800, "contacts": ["engineering_manager", "cto"]} + ] + ### END SOLUTION + + def register_model_version(self, model_name: str, model, training_metadata: Dict[str, Any]) -> ModelVersion: + """ + TODO: Register a new model version with complete lineage tracking. + + STEP-BY-STEP IMPLEMENTATION: + 1. Generate version ID (timestamp-based or semantic versioning) + 2. Calculate training data hash for reproducibility + 3. Extract performance metrics from training metadata + 4. Determine parent version (if this is an update) + 5. Create ModelVersion object with all metadata + 6. Store in model registry + 7. Update lineage tracking + 8. Return the registered version + + EXAMPLE USAGE: + ```python + metadata = { + "training_accuracy": 0.94, + "validation_accuracy": 0.91, + "training_time": 3600, + "data_sources": ["customer_data_v2", "external_features_v1"] + } + version = profiler.register_model_version("recommendation_model", model, metadata) + ``` + + IMPLEMENTATION HINTS: + - Use timestamp for version ID: f"{model_name}_v{timestamp}" + - Hash training metadata for data lineage + - Extract standard metrics (accuracy, loss, etc.) + - Find most recent version as parent + """ + ### BEGIN SOLUTION + # Generate version ID + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + version_id = f"{model_name}_v{timestamp}" + + # Calculate training data hash + training_data_str = json.dumps(training_metadata.get("data_sources", []), sort_keys=True) + training_data_hash = str(hash(training_data_str)) + + # Extract performance metrics + performance_metrics = { + "training_accuracy": training_metadata.get("training_accuracy", 0.0), + "validation_accuracy": training_metadata.get("validation_accuracy", 0.0), + "test_accuracy": training_metadata.get("test_accuracy", 0.0), + "training_loss": training_metadata.get("training_loss", 0.0), + "training_time": training_metadata.get("training_time", 0.0) + } + + # Determine parent version + parent_version = None + if self.model_versions[model_name]: + parent_version = self.model_versions[model_name][-1].version_id + + # Create model version + model_version = ModelVersion( + version_id=version_id, + model_name=model_name, + created_at=datetime.now(), + training_data_hash=training_data_hash, + performance_metrics=performance_metrics, + parent_version=parent_version, + tags=training_metadata.get("tags", {}), + deployment_config=training_metadata.get("deployment_config", {}) + ) + + # Store in registry + self.model_versions[model_name].append(model_version) + + return model_version + ### END SOLUTION + + def create_continuous_training_pipeline(self, pipeline_config: Dict[str, Any]) -> Dict[str, Any]: + """ + TODO: Create a continuous training pipeline configuration. + + STEP-BY-STEP IMPLEMENTATION: + 1. Validate pipeline configuration parameters + 2. Set up training schedule (cron-style or trigger-based) + 3. Configure data pipeline (sources, preprocessing, validation) + 4. Set up model training workflow (hyperparameters, resources) + 5. Configure validation and testing procedures + 6. Set up deployment automation + 7. Configure monitoring and alerting + 8. Return pipeline specification + + EXAMPLE USAGE: + ```python + config = { + "schedule": "0 2 * * 0", # Weekly at 2 AM Sunday + "data_sources": ["production_logs", "user_interactions"], + "training_config": {"epochs": 100, "batch_size": 32}, + "validation_split": 0.2, + "auto_deploy_threshold": 0.02 # 2% improvement + } + pipeline = profiler.create_continuous_training_pipeline(config) + ``` + + IMPLEMENTATION HINTS: + - Validate all required configuration parameters + - Set reasonable defaults for missing parameters + - Create comprehensive pipeline specification + - Include error handling and retry logic + """ + ### BEGIN SOLUTION + # Validate required parameters + required_params = ["schedule", "data_sources", "training_config"] + for param in required_params: + if param not in pipeline_config: + raise ValueError(f"Missing required parameter: {param}") + + # Create pipeline specification + pipeline_spec = { + "pipeline_id": f"ct_pipeline_{datetime.now().strftime('%Y%m%d_%H%M%S')}", + "system_name": self.system_name, + "created_at": datetime.now(), + + # Training schedule + "schedule": { + "type": "cron" if " " in pipeline_config["schedule"] else "trigger", + "expression": pipeline_config["schedule"], + "timezone": pipeline_config.get("timezone", "UTC") + }, + + # Data pipeline + "data_pipeline": { + "sources": pipeline_config["data_sources"], + "preprocessing": pipeline_config.get("preprocessing", ["normalize", "validate"]), + "validation_checks": pipeline_config.get("validation_checks", [ + "schema_validation", "data_quality", "drift_detection" + ]), + "data_retention": pipeline_config.get("data_retention", "30d") + }, + + # Model training + "training_workflow": { + "config": pipeline_config["training_config"], + "resources": pipeline_config.get("resources", {"cpu": 4, "memory": "8Gi"}), + "timeout": pipeline_config.get("timeout", 7200), # 2 hours + "retry_policy": pipeline_config.get("retry_policy", {"max_attempts": 3, "backoff": "exponential"}) + }, + + # Validation and testing + "validation": { + "validation_split": pipeline_config.get("validation_split", 0.2), + "test_split": pipeline_config.get("test_split", 0.1), + "success_criteria": pipeline_config.get("success_criteria", { + "min_accuracy": 0.85, + "max_training_time": 3600, + "max_model_size": "100MB" + }) + }, + + # Deployment automation + "deployment": { + "auto_deploy": pipeline_config.get("auto_deploy", True), + "deploy_threshold": pipeline_config.get("auto_deploy_threshold", 0.02), + "strategy": pipeline_config.get("deployment_strategy", "canary"), + "approval_required": pipeline_config.get("approval_required", False) + }, + + # Monitoring and alerting + "monitoring": { + "metrics": pipeline_config.get("monitoring_metrics", [ + "accuracy", "latency", "throughput", "error_rate" + ]), + "alert_channels": pipeline_config.get("alert_channels", self.alert_channels), + "alert_thresholds": pipeline_config.get("alert_thresholds", self.production_config["alert_thresholds"]) + } + } + + return pipeline_spec + ### END SOLUTION + + def detect_advanced_feature_drift(self, baseline_features: np.ndarray, current_features: np.ndarray, + feature_names: List[str]) -> Dict[str, Any]: + """ + TODO: Perform advanced feature drift detection using multiple statistical tests. + + STEP-BY-STEP IMPLEMENTATION: + 1. Validate input dimensions and feature names + 2. Perform multiple statistical tests per feature: + - Kolmogorov-Smirnov test for distribution changes + - Population Stability Index (PSI) for segmented analysis + - Jensen-Shannon divergence for distribution similarity + - Chi-square test for categorical features + 3. Calculate feature importance weights for drift impact + 4. Perform multivariate drift detection (covariance changes) + 5. Generate drift severity scores and recommendations + 6. Create comprehensive drift report + + EXAMPLE USAGE: + ```python + baseline = np.random.normal(0, 1, (10000, 20)) + current = np.random.normal(0.2, 1.1, (5000, 20)) + feature_names = [f"feature_{i}" for i in range(20)] + drift_report = profiler.detect_advanced_feature_drift(baseline, current, feature_names) + ``` + + IMPLEMENTATION HINTS: + - Use multiple statistical tests for robustness + - Weight drift by feature importance + - Calculate multivariate drift metrics + - Provide actionable recommendations + """ + ### BEGIN SOLUTION + # Validate inputs + if baseline_features.shape[1] != current_features.shape[1]: + raise ValueError("Feature dimensions must match") + if len(feature_names) != baseline_features.shape[1]: + raise ValueError("Feature names must match feature dimensions") + + n_features = baseline_features.shape[1] + drift_results = {} + severe_drift_count = 0 + moderate_drift_count = 0 + + # Per-feature drift analysis + for i, feature_name in enumerate(feature_names): + baseline_feature = baseline_features[:, i] + current_feature = current_features[:, i] + + # Statistical tests + feature_result = { + "feature_name": feature_name, + "baseline_stats": { + "mean": np.mean(baseline_feature), + "std": np.std(baseline_feature), + "min": np.min(baseline_feature), + "max": np.max(baseline_feature) + }, + "current_stats": { + "mean": np.mean(current_feature), + "std": np.std(current_feature), + "min": np.min(current_feature), + "max": np.max(current_feature) + } + } + + # Mean shift test + mean_shift = abs(np.mean(current_feature) - np.mean(baseline_feature)) / (np.std(baseline_feature) + 1e-8) + feature_result["mean_shift"] = mean_shift + feature_result["mean_shift_significant"] = mean_shift > 2.0 + + # Variance shift test + variance_ratio = np.std(current_feature) / (np.std(baseline_feature) + 1e-8) + feature_result["variance_ratio"] = variance_ratio + feature_result["variance_shift_significant"] = variance_ratio > 1.5 or variance_ratio < 0.67 + + # Population Stability Index (PSI) + try: + # Create bins for PSI calculation + bins = np.percentile(baseline_feature, [0, 10, 25, 50, 75, 90, 100]) + baseline_dist = np.histogram(baseline_feature, bins=bins)[0] + 1e-8 + current_dist = np.histogram(current_feature, bins=bins)[0] + 1e-8 + + # Normalize distributions + baseline_dist = baseline_dist / np.sum(baseline_dist) + current_dist = current_dist / np.sum(current_dist) + + # Calculate PSI + psi = np.sum((current_dist - baseline_dist) * np.log(current_dist / baseline_dist)) + feature_result["psi"] = psi + feature_result["psi_significant"] = psi > 0.2 # Industry standard threshold + except: + feature_result["psi"] = 0.0 + feature_result["psi_significant"] = False + + # Overall drift assessment + drift_indicators = [ + feature_result["mean_shift_significant"], + feature_result["variance_shift_significant"], + feature_result["psi_significant"] + ] + + drift_score = sum(drift_indicators) / len(drift_indicators) + + if drift_score >= 0.67: # 2 out of 3 tests + feature_result["drift_severity"] = "severe" + severe_drift_count += 1 + elif drift_score >= 0.33: # 1 out of 3 tests + feature_result["drift_severity"] = "moderate" + moderate_drift_count += 1 + else: + feature_result["drift_severity"] = "low" + + drift_results[feature_name] = feature_result + + # Multivariate drift analysis + try: + # Covariance matrix comparison + baseline_cov = np.cov(baseline_features.T) + current_cov = np.cov(current_features.T) + cov_diff = np.linalg.norm(current_cov - baseline_cov) / np.linalg.norm(baseline_cov) + multivariate_drift = cov_diff > 0.3 + except: + cov_diff = 0.0 + multivariate_drift = False + + # Generate recommendations + recommendations = [] + if severe_drift_count > 0: + recommendations.append(f"Investigate {severe_drift_count} features with severe drift") + recommendations.append("Consider immediate model retraining") + recommendations.append("Review data pipeline for upstream changes") + + if moderate_drift_count > n_features * 0.3: # More than 30% of features + recommendations.append("High proportion of features showing drift") + recommendations.append("Evaluate feature engineering pipeline") + + if multivariate_drift: + recommendations.append("Multivariate relationships have changed") + recommendations.append("Consider feature interaction analysis") + + # Overall assessment + overall_drift_severity = "low" + if severe_drift_count > 0 or multivariate_drift: + overall_drift_severity = "severe" + elif moderate_drift_count > n_features * 0.2: # More than 20% of features + overall_drift_severity = "moderate" + + return { + "timestamp": datetime.now(), + "overall_drift_severity": overall_drift_severity, + "severe_drift_count": severe_drift_count, + "moderate_drift_count": moderate_drift_count, + "total_features": n_features, + "multivariate_drift": multivariate_drift, + "covariance_difference": cov_diff, + "feature_drift_results": drift_results, + "recommendations": recommendations, + "drift_summary": { + "features_with_severe_drift": [name for name, result in drift_results.items() + if result["drift_severity"] == "severe"], + "features_with_moderate_drift": [name for name, result in drift_results.items() + if result["drift_severity"] == "moderate"] + } + } + ### END SOLUTION + + def orchestrate_deployment(self, model_version: ModelVersion, strategy_name: str = "canary") -> Dict[str, Any]: + """ + TODO: Orchestrate model deployment using specified strategy. + + STEP-BY-STEP IMPLEMENTATION: + 1. Validate model version and deployment strategy + 2. Get deployment strategy configuration + 3. Create deployment plan with phases + 4. Initialize traffic routing and monitoring + 5. Execute deployment phases with validation + 6. Monitor deployment health and success criteria + 7. Handle rollback if criteria not met + 8. Record deployment in history + + EXAMPLE USAGE: + ```python + deployment_result = profiler.orchestrate_deployment(model_version, "canary") + if deployment_result["success"]: + print(f"Deployment {deployment_result['deployment_id']} successful") + ``` + + IMPLEMENTATION HINTS: + - Validate strategy exists in self.deployment_strategies + - Create unique deployment_id + - Simulate deployment phases + - Check success criteria at each phase + - Handle rollback scenarios + """ + ### BEGIN SOLUTION + # Validate inputs + if strategy_name not in self.deployment_strategies: + raise ValueError(f"Unknown deployment strategy: {strategy_name}") + + strategy = self.deployment_strategies[strategy_name] + deployment_id = f"deploy_{model_version.version_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" + + # Create deployment plan + deployment_plan = { + "deployment_id": deployment_id, + "model_version": model_version, + "strategy": strategy, + "start_time": datetime.now(), + "phases": [], + "status": "in_progress" + } + + # Execute deployment phases + success = True + rollback_required = False + + try: + # Phase 1: Pre-deployment validation + phase1_result = { + "phase": "pre_deployment_validation", + "start_time": datetime.now(), + "checks": { + "model_validation": True, + "infrastructure_ready": True, + "dependencies_satisfied": True + }, + "success": True + } + deployment_plan["phases"].append(phase1_result) + + # Phase 2: Initial deployment (with traffic split) + if strategy.strategy_type == "canary": + # Canary deployment + phase2_result = { + "phase": "canary_deployment", + "start_time": datetime.now(), + "traffic_split": strategy.traffic_split, + "monitoring_window": strategy.monitoring_window, + "metrics": { + "accuracy": np.random.uniform(0.88, 0.95), + "latency": np.random.uniform(300, 450), + "error_rate": np.random.uniform(0.01, 0.03) + } + } + + # Check success criteria + metrics = phase2_result["metrics"] + criteria_met = ( + metrics["accuracy"] >= strategy.success_criteria["accuracy"] and + metrics["latency"] <= strategy.success_criteria["latency"] and + metrics["error_rate"] <= strategy.success_criteria["error_rate"] + ) + + phase2_result["success"] = criteria_met + deployment_plan["phases"].append(phase2_result) + + if not criteria_met: + rollback_required = True + success = False + + elif strategy.strategy_type == "blue_green": + # Blue-green deployment + phase2_result = { + "phase": "blue_green_deployment", + "start_time": datetime.now(), + "environment": "green", + "validation_tests": { + "smoke_tests": True, + "integration_tests": True, + "performance_tests": True + }, + "success": True + } + deployment_plan["phases"].append(phase2_result) + + # Phase 3: Full rollout (if canary successful) + if success and strategy.strategy_type == "canary": + phase3_result = { + "phase": "full_rollout", + "start_time": datetime.now(), + "traffic_split": {"current": 0.0, "new": 1.0}, + "success": True + } + deployment_plan["phases"].append(phase3_result) + + # Phase 4: Post-deployment monitoring + if success: + phase4_result = { + "phase": "post_deployment_monitoring", + "start_time": datetime.now(), + "monitoring_duration": 3600, # 1 hour + "alerts_triggered": 0, + "success": True + } + deployment_plan["phases"].append(phase4_result) + + # Update active deployment + self.active_deployments[deployment_id] = model_version + + except Exception as e: + success = False + rollback_required = True + deployment_plan["error"] = str(e) + + # Handle rollback if needed + if rollback_required: + rollback_result = { + "phase": "rollback", + "start_time": datetime.now(), + "reason": "Success criteria not met" if not success else "Error during deployment", + "success": True + } + deployment_plan["phases"].append(rollback_result) + + # Finalize deployment + deployment_plan["end_time"] = datetime.now() + deployment_plan["status"] = "success" if success else "failed" + deployment_plan["rollback_executed"] = rollback_required + + # Record in history + self.deployment_history.append(deployment_plan) + + return { + "deployment_id": deployment_id, + "success": success, + "strategy_used": strategy_name, + "rollback_required": rollback_required, + "phases_completed": len(deployment_plan["phases"]), + "deployment_plan": deployment_plan + } + ### END SOLUTION + + def handle_production_incident(self, incident_data: Dict[str, Any]) -> Dict[str, Any]: + """ + TODO: Handle production incidents with automated response. + + STEP-BY-STEP IMPLEMENTATION: + 1. Classify incident severity and type + 2. Execute automated recovery procedures + 3. Determine if escalation is required + 4. Log incident and response actions + 5. Monitor recovery success + 6. Generate incident report + + EXAMPLE USAGE: + ```python + incident = { + "type": "performance_degradation", + "severity": "high", + "metrics": {"accuracy": 0.75, "latency": 800, "error_rate": 0.15}, + "affected_models": ["recommendation_model_v20240101"] + } + response = profiler.handle_production_incident(incident) + ``` + + IMPLEMENTATION HINTS: + - Classify incidents by type and severity + - Execute appropriate recovery actions + - Log all actions for audit trail + - Determine escalation requirements + """ + ### BEGIN SOLUTION + incident_id = f"incident_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{len(self.incident_log)}" + incident_start = datetime.now() + + # Classify incident + incident_type = incident_data.get("type", "unknown") + severity = incident_data.get("severity", "medium") + affected_models = incident_data.get("affected_models", []) + metrics = incident_data.get("metrics", {}) + + # Initialize response + response_actions = [] + escalation_required = False + recovery_successful = False + + # Automated recovery procedures + if incident_type == "performance_degradation": + # Check if metrics breach rollback criteria + accuracy = metrics.get("accuracy", 1.0) + latency = metrics.get("latency", 0) + error_rate = metrics.get("error_rate", 0) + + rollback_needed = ( + accuracy < 0.80 or # Critical accuracy threshold + latency > 1000 or # Critical latency threshold + error_rate > 0.10 # Critical error rate threshold + ) + + if rollback_needed and self.rollback_policies["auto_rollback_enabled"]: + # Execute automatic rollback + response_actions.append({ + "action": "automatic_rollback", + "timestamp": datetime.now(), + "details": "Rolling back to previous stable version", + "success": True + }) + recovery_successful = True + + # Scale resources if needed + if latency > 600: + response_actions.append({ + "action": "scale_resources", + "timestamp": datetime.now(), + "details": "Increasing compute resources", + "success": True + }) + + elif incident_type == "data_drift": + # Trigger retraining pipeline + response_actions.append({ + "action": "trigger_retraining", + "timestamp": datetime.now(), + "details": "Initiating continuous training pipeline", + "success": True + }) + + # Increase monitoring frequency + response_actions.append({ + "action": "increase_monitoring", + "timestamp": datetime.now(), + "details": "Reducing monitoring interval to 1 minute", + "success": True + }) + + elif incident_type == "system_failure": + # Restart affected services + response_actions.append({ + "action": "restart_services", + "timestamp": datetime.now(), + "details": "Restarting inference endpoints", + "success": True + }) + + # Health check after restart + response_actions.append({ + "action": "health_check", + "timestamp": datetime.now(), + "details": "Validating service health post-restart", + "success": True + }) + recovery_successful = True + + # Determine escalation requirements + if severity == "critical" or not recovery_successful: + escalation_required = True + + # Find appropriate escalation level + escalation_level = 1 + if severity == "critical": + escalation_level = 2 + if incident_type == "security_breach": + escalation_level = 3 + + response_actions.append({ + "action": "escalate_incident", + "timestamp": datetime.now(), + "details": f"Escalating to level {escalation_level}", + "escalation_level": escalation_level, + "contacts": self.escalation_rules[escalation_level - 1]["contacts"], + "success": True + }) + + # Create incident record + incident_record = { + "incident_id": incident_id, + "incident_type": incident_type, + "severity": severity, + "start_time": incident_start, + "end_time": datetime.now(), + "affected_models": affected_models, + "metrics": metrics, + "response_actions": response_actions, + "escalation_required": escalation_required, + "recovery_successful": recovery_successful, + "resolution_time": (datetime.now() - incident_start).total_seconds() + } + + # Log incident + self.incident_log.append(incident_record) + + return { + "incident_id": incident_id, + "response_actions_taken": len(response_actions), + "recovery_successful": recovery_successful, + "escalation_required": escalation_required, + "resolution_time_seconds": incident_record["resolution_time"], + "incident_record": incident_record + } + ### END SOLUTION + + def generate_mlops_governance_report(self) -> Dict[str, Any]: + """ + TODO: Generate comprehensive MLOps governance and compliance report. + + STEP-BY-STEP IMPLEMENTATION: + 1. Collect model registry statistics + 2. Analyze deployment history and patterns + 3. Review incident response effectiveness + 4. Calculate system reliability metrics + 5. Assess compliance with policies + 6. Generate actionable recommendations + + EXAMPLE RETURN: + ```python + { + "report_date": datetime(2024, 1, 1), + "system_health_score": 0.92, + "model_registry_stats": {...}, + "deployment_success_rate": 0.95, + "incident_response_metrics": {...}, + "compliance_status": "compliant", + "recommendations": ["Improve deployment automation", ...] + } + ``` + """ + ### BEGIN SOLUTION + report_date = datetime.now() + + # Model registry statistics + total_models = len(self.model_versions) + total_versions = sum(len(versions) for versions in self.model_versions.values()) + active_deployments_count = len(self.active_deployments) + + model_registry_stats = { + "total_models": total_models, + "total_versions": total_versions, + "active_deployments": active_deployments_count, + "average_versions_per_model": total_versions / max(total_models, 1) + } + + # Deployment history analysis + total_deployments = len(self.deployment_history) + successful_deployments = sum(1 for d in self.deployment_history if d["status"] == "success") + deployment_success_rate = successful_deployments / max(total_deployments, 1) + + rollback_count = sum(1 for d in self.deployment_history if d.get("rollback_executed", False)) + rollback_rate = rollback_count / max(total_deployments, 1) + + deployment_metrics = { + "total_deployments": total_deployments, + "success_rate": deployment_success_rate, + "rollback_rate": rollback_rate, + "average_deployment_time": 1800 if total_deployments > 0 else 0 # Simulated + } + + # Incident response analysis + total_incidents = len(self.incident_log) + if total_incidents > 0: + resolved_incidents = sum(1 for i in self.incident_log if i["recovery_successful"]) + average_resolution_time = np.mean([i["resolution_time"] for i in self.incident_log]) + escalation_rate = sum(1 for i in self.incident_log if i["escalation_required"]) / total_incidents + else: + resolved_incidents = 0 + average_resolution_time = 0 + escalation_rate = 0 + + incident_metrics = { + "total_incidents": total_incidents, + "resolution_rate": resolved_incidents / max(total_incidents, 1), + "average_resolution_time": average_resolution_time, + "escalation_rate": escalation_rate + } + + # System health score calculation + health_components = { + "deployment_success": deployment_success_rate, + "incident_resolution": incident_metrics["resolution_rate"], + "system_availability": 0.995, # Simulated high availability + "monitoring_coverage": 0.90 # Simulated monitoring coverage + } + + system_health_score = np.mean(list(health_components.values())) + + # Compliance assessment + compliance_checks = { + "model_versioning": total_versions > 0, + "deployment_automation": deployment_success_rate > 0.9, + "incident_response": average_resolution_time < 1800, # 30 minutes + "monitoring_enabled": len(self.performance_monitors) > 0, + "rollback_capability": self.rollback_policies["auto_rollback_enabled"] + } + + compliance_score = sum(compliance_checks.values()) / len(compliance_checks) + compliance_status = "compliant" if compliance_score >= 0.8 else "non_compliant" + + # Generate recommendations + recommendations = [] + + if deployment_success_rate < 0.95: + recommendations.append("Improve deployment automation and testing") + + if rollback_rate > 0.10: + recommendations.append("Enhance pre-deployment validation") + + if incident_metrics["escalation_rate"] > 0.20: + recommendations.append("Improve automated incident response procedures") + + if system_health_score < 0.90: + recommendations.append("Review overall system reliability and monitoring") + + if not compliance_checks["monitoring_enabled"]: + recommendations.append("Implement comprehensive monitoring coverage") + + return { + "report_date": report_date, + "system_name": self.system_name, + "reporting_period": "all_time", # Could be configurable + + "system_health_score": system_health_score, + "health_components": health_components, + + "model_registry_stats": model_registry_stats, + "deployment_metrics": deployment_metrics, + "incident_response_metrics": incident_metrics, + + "compliance_status": compliance_status, + "compliance_score": compliance_score, + "compliance_checks": compliance_checks, + + "recommendations": recommendations, + + "summary": { + "models_managed": total_models, + "deployments_executed": total_deployments, + "incidents_handled": total_incidents, + "overall_reliability": "high" if system_health_score > 0.9 else "medium" if system_health_score > 0.8 else "low" + } + } + ### END SOLUTION + +# %% [markdown] +""" +### ๐Ÿงช Test Your Production MLOps Profiler + +Once you implement the `ProductionMLOpsProfiler` class above, run this cell to test it: +""" + +# %% nbgrader={"grade": true, "grade_id": "test-production-mlops-profiler", "locked": true, "points": 40, "schema_version": 3, "solution": false, "task": false} +def test_unit_production_mlops_profiler(): + """Test ProductionMLOpsProfiler implementation""" + print("๐Ÿ”ฌ Unit Test: Production MLOps Profiler...") + + # Test initialization + config = { + "monitoring_interval": 300, + "alert_thresholds": {"accuracy": 0.85, "latency": 500}, + "auto_rollback": True + } + profiler = ProductionMLOpsProfiler("test_system", config) + + assert profiler.system_name == "test_system" + assert profiler.production_config["monitoring_interval"] == 300 + assert "canary" in profiler.deployment_strategies + assert "blue_green" in profiler.deployment_strategies + + # Test model version registration + metadata = { + "training_accuracy": 0.94, + "validation_accuracy": 0.91, + "training_time": 3600, + "data_sources": ["dataset_v1", "features_v2"] + } + model_version = profiler.register_model_version("test_model", "mock_model", metadata) + + assert model_version.model_name == "test_model" + assert model_version.performance_metrics["training_accuracy"] == 0.94 + assert "test_model" in profiler.model_versions + assert len(profiler.model_versions["test_model"]) == 1 + + # Test continuous training pipeline + pipeline_config = { + "schedule": "0 2 * * 0", + "data_sources": ["production_logs"], + "training_config": {"epochs": 100}, + "auto_deploy_threshold": 0.02 + } + pipeline_spec = profiler.create_continuous_training_pipeline(pipeline_config) + + assert "pipeline_id" in pipeline_spec + assert pipeline_spec["schedule"]["expression"] == "0 2 * * 0" + assert "training_workflow" in pipeline_spec + assert "deployment" in pipeline_spec + + # Test advanced feature drift detection + baseline_features = np.random.normal(0, 1, (1000, 5)) + current_features = np.random.normal(0.3, 1.2, (500, 5)) # Shifted data + feature_names = [f"feature_{i}" for i in range(5)] + + drift_report = profiler.detect_advanced_feature_drift(baseline_features, current_features, feature_names) + + assert "overall_drift_severity" in drift_report + assert "feature_drift_results" in drift_report + assert "recommendations" in drift_report + assert len(drift_report["feature_drift_results"]) == 5 + + # Test deployment orchestration + deployment_result = profiler.orchestrate_deployment(model_version, "canary") + + assert "deployment_id" in deployment_result + assert "success" in deployment_result + assert "strategy_used" in deployment_result + assert deployment_result["strategy_used"] == "canary" + + # Test production incident handling + incident_data = { + "type": "performance_degradation", + "severity": "high", + "metrics": {"accuracy": 0.75, "latency": 800, "error_rate": 0.15}, + "affected_models": [model_version.version_id] + } + incident_response = profiler.handle_production_incident(incident_data) + + assert "incident_id" in incident_response + assert "response_actions_taken" in incident_response + assert "recovery_successful" in incident_response + assert len(profiler.incident_log) == 1 + + # Test governance report + governance_report = profiler.generate_mlops_governance_report() + + assert "system_health_score" in governance_report + assert "model_registry_stats" in governance_report + assert "deployment_metrics" in governance_report + assert "incident_response_metrics" in governance_report + assert "compliance_status" in governance_report + assert "recommendations" in governance_report + + print("โœ… Production MLOps Profiler initialization works correctly") + print("โœ… Model version registration and lineage tracking work") + print("โœ… Continuous training pipeline creation works") + print("โœ… Advanced feature drift detection works") + print("โœ… Deployment orchestration with strategies works") + print("โœ… Production incident handling works") + print("โœ… MLOps governance reporting works") + print("๐Ÿ“ˆ Progress: Production MLOps Profiler โœ“") + +# Run the test +test_unit_production_mlops_profiler() + +# %% [markdown] +""" +## ๐ŸŽฏ COMPREHENSIVE ML SYSTEMS THINKING QUESTIONS + +Now that you've implemented a production-grade MLOps system, let's explore the deeper implications for enterprise ML systems: + +### ๐Ÿ—๏ธ Production ML Deployment Strategies + +**Real-World Deployment Patterns:** +- How do canary deployments compare to blue-green deployments in terms of risk, complexity, and resource requirements? +- When would you choose A/B testing over canary deployments for model updates? +- How do major tech companies like Netflix and Uber handle model deployment at scale? + +**Infrastructure Considerations:** +- What are the trade-offs between containerized deployments (Docker/Kubernetes) vs. serverless (Lambda/Cloud Functions) for ML models? +- How does edge deployment (mobile devices, IoT) change your MLOps strategy? +- What role does model serving infrastructure (TensorFlow Serving, Seldon, KFServing) play in production systems? + +**Risk Management:** +- How would you design a deployment strategy for a safety-critical system (autonomous vehicles, medical diagnosis)? +- What are the key differences between deploying ML models vs. traditional software? +- How do you balance deployment speed with safety in production ML systems? + +### ๐Ÿ” Model Governance and Compliance + +**Regulatory Requirements:** +- How do GDPR "right to explanation" requirements affect your model versioning and lineage tracking? +- What additional governance features would be needed for FDA-regulated medical ML systems? +- How does model governance differ between financial services (risk models) and consumer applications? + +**Enterprise Policies:** +- How would you implement model approval workflows for enterprise environments? +- What role does model interpretability play in production governance? +- How do you handle model bias detection and mitigation in production systems? + +**Audit and Compliance:** +- What information would auditors need from your MLOps system? +- How do you ensure reproducibility of model training across different environments? +- What are the key compliance differences between on-premise and cloud MLOps? + +### ๐Ÿข MLOps Platform Design + +**Platform Architecture:** +- How would you design an MLOps platform to serve multiple teams with different ML frameworks (PyTorch, TensorFlow, scikit-learn)? +- What are the pros and cons of building vs. buying MLOps infrastructure? +- How do you handle resource allocation and cost management in multi-tenant MLOps platforms? + +**Integration Patterns:** +- How does MLOps integrate with existing CI/CD pipelines and DevOps practices? +- What are the key differences between MLOps and traditional DevOps? +- How do you handle data pipeline integration with model training and deployment? + +**Scalability Considerations:** +- How would you design an MLOps system to handle thousands of models across hundreds of teams? +- What are the bottlenecks in scaling ML model training and deployment? +- How do you handle cross-region deployment and disaster recovery for ML systems? + +### ๐Ÿšจ Incident Response and Debugging + +**Production Incidents:** +- What are the most common types of ML production incidents, and how do they differ from traditional software incidents? +- How would you design an incident response playbook specifically for ML systems? +- What metrics would you monitor to detect ML-specific issues (data drift, model degradation, bias drift)? + +**Debugging Strategies:** +- How do you debug a model that was working yesterday but is performing poorly today? +- What tools and techniques help diagnose issues in production ML pipelines? +- How do you distinguish between data issues, model issues, and infrastructure issues? + +**Recovery Procedures:** +- What are the key considerations for automated vs. manual rollback of ML models? +- How do you handle incidents where multiple models are interdependent? +- What role does feature store health play in ML incident response? + +### ๐Ÿ—๏ธ Enterprise ML Infrastructure + +**Resource Management:** +- How do you optimize compute costs for ML training and inference workloads? +- What are the trade-offs between GPU clusters, cloud ML services, and specialized ML hardware? +- How do you handle resource scheduling for batch training vs. real-time inference? + +**Data Infrastructure:** +- How does feature store architecture impact MLOps design? +- What are the key considerations for real-time vs. batch feature computation? +- How do you handle data versioning and lineage in production ML systems? + +**Security and Privacy:** +- What are the unique security challenges of ML systems compared to traditional applications? +- How do you implement differential privacy in production ML pipelines? +- What role does federated learning play in enterprise MLOps strategies? + +These questions connect your MLOps implementation to real-world enterprise challenges. Consider how the patterns you've implemented would scale to handle Netflix's recommendation systems, Tesla's autonomous driving models, or Google's search ranking algorithms. +""" + # %% [markdown] """ ## ๐ŸŽฏ MODULE SUMMARY: MLOps and Production Systems -Congratulations! You've successfully implemented MLOps and production systems: +Congratulations! You've successfully implemented enterprise-grade MLOps and production systems: ### What You've Accomplished -โœ… **Model Lifecycle Management**: Registry, versioning, and metadata tracking -โœ… **Production Serving**: Scalable inference endpoints and monitoring -โœ… **Monitoring Systems**: Comprehensive tracking and alerting -โœ… **A/B Testing Framework**: Experimental design and validation -โœ… **Continuous Learning**: Automated retraining and deployment -โœ… **Integration**: Real-world MLOps with TinyTorch models +โœ… **Performance Drift Monitoring**: Real-time model health tracking with automated alerting +โœ… **Feature Drift Detection**: Statistical analysis of data distribution changes +โœ… **Automated Retraining**: Trigger-based model retraining with validation +โœ… **Complete MLOps Pipeline**: End-to-end integration of all MLOps components +โœ… **Production MLOps Profiler**: Enterprise-grade model lifecycle management +โœ… **Deployment Orchestration**: Canary and blue-green deployment strategies +โœ… **Incident Response**: Automated detection and recovery procedures +โœ… **Governance and Compliance**: Comprehensive audit trails and reporting ### Key Concepts You've Learned -- **Model lifecycle management**: Tracking, versioning, and metadata -- **Production serving**: Scalable endpoints and monitoring -- **Monitoring and observability**: Tracking, alerting, and drift detection -- **A/B testing**: Experimental design and statistical validation -- **Continuous learning**: Automated retraining and deployment -- **Integration patterns**: How MLOps works with neural networks +- **Model lifecycle management**: Complete tracking from development to retirement +- **Production monitoring**: Multi-dimensional performance and health tracking +- **Automated deployment**: Safe rollout strategies with automated rollback +- **Feature drift detection**: Advanced statistical analysis for data changes +- **Incident response**: Automated detection, response, and escalation +- **Enterprise governance**: Compliance, audit trails, and policy enforcement ### Professional Skills Developed -- **MLOps engineering**: Building robust production systems -- **Monitoring and alerting**: Ensuring reliability and performance -- **Experimentation**: Designing and validating experiments -- **Continuous improvement**: Automating retraining and deployment -- **Integration testing**: Ensuring MLOps works with neural networks +- **MLOps engineering**: Building robust, scalable production systems +- **Production deployment**: Safe model rollout strategies and risk management +- **Monitoring and observability**: Comprehensive system health tracking +- **Incident management**: Automated response and recovery procedures +- **Enterprise architecture**: Scalable, compliant MLOps platform design -### Ready for Advanced Applications +### Ready for Enterprise Applications Your MLOps implementations now enable: -- **Enterprise deployment**: Managing models at scale -- **Production monitoring**: Ensuring reliability and performance -- **Continuous improvement**: Automated retraining and deployment -- **Research and experimentation**: Validating new ideas in production +- **Enterprise-scale deployment**: Managing hundreds of models across teams +- **Regulatory compliance**: Meeting audit and governance requirements +- **High-availability systems**: Production-grade reliability and monitoring +- **Automated operations**: Self-healing and self-maintaining ML systems ### Connection to Real ML Systems -Your implementations mirror production systems: -- **MLflow**: Model registry and lifecycle management -- **Seldon Core**: Production serving and monitoring -- **TensorFlow Extended (TFX)**: End-to-end MLOps pipelines -- **Industry Standard**: Every major ML framework uses these exact patterns +Your implementations mirror industry-leading platforms: +- **MLflow**: Model registry and experiment tracking +- **Kubeflow**: Kubernetes-native ML workflows +- **TensorFlow Extended (TFX)**: End-to-end ML production pipelines +- **Seldon Core**: Advanced deployment and monitoring +- **AWS SageMaker**: Comprehensive MLOps platform ### Next Steps 1. **Export your code**: `tito export 15_mlops` 2. **Test your implementation**: `tito test 15_mlops` 3. **Deploy models**: Use MLOps for production deployment -4. **Move to Capstone**: Integrate the full TinyTorch ecosystem! +4. **Capstone Project**: Integrate the complete TinyTorch ecosystem! -**Ready for the capstone?** Your MLOps systems are now ready for real-world production! +**Ready for enterprise MLOps?** Your production systems are now ready for real-world deployment at scale! """ \ No newline at end of file