diff --git a/.env.example b/.env.example index 77dc153..a66fd59 100644 --- a/.env.example +++ b/.env.example @@ -1,10 +1,9 @@ NODE_ENV=production NEXT_PUBLIC_AUTH_REDIRECT=http://app.flowsint.localhost/auth/callback -HIBP_API_KEY=70b78a3256c84d09b79cd4953d77bdf3 +HIBP_API_KEY= NEXT_PUBLIC_FLOWSINT_API=https://api.flowsint.localhost/api NEXT_PUBLIC_DOCKER_FLOWSINT_API=http://flowsint-api:5000/api -NEXT_PUBLIC_SUPABASE_ANON_KEY=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6InR5bGhuc2F5eXRhb2FhaXFzZ2RwIiwicm9sZSI6ImFub24iLCJpYXQiOjE3MzgzMzY3MzAsImV4cCI6MjA1MzkxMjczMH0.iVcpz8RpOgzVSamp_tNQQmjdLL9_Olx6m4LfsLVw1bg -AUTH_SECRET="superscretchangeitplz" # Added by `npx auth`. Read more: https://cli.authjs.dev +AUTH_SECRET="superscretchangeitplz" AUTH_TRUST_HOST=true NEO4J_URI_BOLT=bolt://neo4j:7687 NEO4J_URI_WEB=https://neo4j.flowsint.localhost diff --git a/flowsint-api/alembic/versions/661ff8ef4425_rename_transforms_to_flows.py b/flowsint-api/alembic/versions/661ff8ef4425_rename_transforms_to_flows.py new file mode 100644 index 0000000..60f2fc1 --- /dev/null +++ b/flowsint-api/alembic/versions/661ff8ef4425_rename_transforms_to_flows.py @@ -0,0 +1,36 @@ +"""rename_transforms_to_flows + +Revision ID: 661ff8ef4425 +Revises: 9a3b9a199aa8 +Create Date: 2025-08-15 16:16:12.792775 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = '661ff8ef4425' +down_revision: Union[str, None] = '9a3b9a199aa8' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + # Rename the table from 'transforms' to 'flows' + op.rename_table('transforms', 'flows') + + # Rename the column from 'transform_schema' to 'flow_schema' + op.alter_column('flows', 'transform_schema', new_column_name='flow_schema') + + +def downgrade() -> None: + """Downgrade schema.""" + # Rename the column back from 'flow_schema' to 'transform_schema' + op.alter_column('flows', 'flow_schema', new_column_name='transform_schema') + + # Rename the table back from 'flows' to 'transforms' + op.rename_table('flows', 'transforms') diff --git a/flowsint-api/app/api/routes/flows.py b/flowsint-api/app/api/routes/flows.py new file mode 100644 index 0000000..2c5bed9 --- /dev/null +++ b/flowsint-api/app/api/routes/flows.py @@ -0,0 +1,548 @@ +from uuid import UUID, uuid4 +from fastapi import APIRouter, HTTPException, Depends, status, Query +from typing import Dict, List, Any, Optional +from pydantic import BaseModel +from datetime import datetime +from flowsint_core.utils import extract_input_schema_flow +from flowsint_core.core.registry import TransformRegistry +from flowsint_core.core.celery import celery +from flowsint_types import Domain, Phrase, Ip, SocialProfile, Organization, Email +from flowsint_core.core.types import Node, Edge, FlowStep, FlowBranch +from sqlalchemy.orm import Session +from flowsint_core.core.postgre_db import get_db +from app.models.models import Flow, Profile +from app.api.deps import get_current_user +from app.api.schemas.flow import FlowRead, FlowCreate, FlowUpdate +from flowsint_types import ( + ASN, + CIDR, + CryptoWallet, + CryptoWalletTransaction, + CryptoNFT, + Website, + Individual, +) + + +class FlowComputationRequest(BaseModel): + nodes: List[Node] + edges: List[Edge] + inputType: Optional[str] = None + + +class FlowComputationResponse(BaseModel): + flowBranches: List[FlowBranch] + initialData: Any + + +class StepSimulationRequest(BaseModel): + flowBranches: List[FlowBranch] + currentStepIndex: int + + +class launchFlowPayload(BaseModel): + values: List[str] + sketch_id: str + + +router = APIRouter() + + +# Get the list of all flows +@router.get("/", response_model=List[FlowRead]) +def get_flows( + category: Optional[str] = Query(None), + db: Session = Depends(get_db), + current_user: Profile = Depends(get_current_user), +): + query = db.query(Flow) + + if category is not None and category != "undefined": + # Case-insensitive filtering by checking if any category matches (case-insensitive) + flows = query.all() + return [ + flow + for flow in flows + if any(cat.lower() == category.lower() for cat in flow.category) + ] + + return query.order_by(Flow.last_updated_at.desc()).all() + + +# Returns the "raw_materials" for the flow editor +@router.get("/raw_materials") +async def get_material_list(): + scanners = TransformRegistry.list_by_categories() + scanner_categories = { + category: [ + { + "class_name": scanner.get("class_name"), + "category": scanner.get("category"), + "name": scanner.get("name"), + "module": scanner.get("module"), + "documentation": scanner.get("documentation"), + "description": scanner.get("description"), + "inputs": scanner.get("inputs"), + "outputs": scanner.get("outputs"), + "type": "scanner", + "params": scanner.get("params"), + "params_schema": scanner.get("params_schema"), + "required_params": scanner.get("required_params"), + "icon": scanner.get("icon"), + } + for scanner in scanner_list + ] + for category, scanner_list in scanners.items() + } + + object_inputs = [ + extract_input_schema_flow(Phrase), + extract_input_schema_flow(Organization), + extract_input_schema_flow(Individual), + extract_input_schema_flow(Domain), + extract_input_schema_flow(Website), + extract_input_schema_flow(Ip), + extract_input_schema_flow(ASN), + extract_input_schema_flow(CIDR), + extract_input_schema_flow(SocialProfile), + extract_input_schema_flow(Email), + extract_input_schema_flow(CryptoWallet), + extract_input_schema_flow(CryptoWalletTransaction), + extract_input_schema_flow(CryptoNFT), + ] + + # Put types first, then add all scanner categories + flattened_scanners = {"types": object_inputs} + flattened_scanners.update(scanner_categories) + + return {"items": flattened_scanners} + + +# Returns the "raw_materials" for the flow editor +@router.get("/input_type/{input_type}") +async def get_material_list(input_type: str): + transforms = TransformRegistry.list_by_input_type(input_type) + return {"items": transforms} + + +# Create a new flow +@router.post("/create", response_model=FlowRead, status_code=status.HTTP_201_CREATED) +def create_flow( + payload: FlowCreate, + db: Session = Depends(get_db), + current_user: Profile = Depends(get_current_user), +): + + new_flow = Flow( + id=uuid4(), + name=payload.name, + description=payload.description, + category=payload.category, + flow_schema=payload.flow_schema, + created_at=datetime.utcnow(), + last_updated_at=datetime.utcnow(), + ) + db.add(new_flow) + db.commit() + db.refresh(new_flow) + return new_flow + + +# Get a flow by ID +@router.get("/{flow_id}", response_model=FlowRead) +def get_flow_by_id( + flow_id: UUID, + db: Session = Depends(get_db), + current_user: Profile = Depends(get_current_user), +): + flow = db.query(Flow).filter(Flow.id == flow_id).first() + if not flow: + raise HTTPException(status_code=404, detail="flow not found") + return flow + + +# Update a flow by ID +@router.put("/{flow_id}", response_model=FlowRead) +def update_flow( + flow_id: UUID, + payload: FlowUpdate, + db: Session = Depends(get_db), + current_user: Profile = Depends(get_current_user), +): + flow = db.query(Flow).filter(Flow.id == flow_id).first() + if not flow: + raise HTTPException(status_code=404, detail="flow not found") + update_data = payload.dict(exclude_unset=True) + for key, value in update_data.items(): + print(f"only update {key}") + if key == "category": + if "SocialProfile" in value: + value.append("Username") + setattr(flow, key, value) + + flow.last_updated_at = datetime.utcnow() + + db.commit() + db.refresh(flow) + return flow + + +# Delete a flow by ID +@router.delete("/{flow_id}", status_code=status.HTTP_204_NO_CONTENT) +def delete_flow( + flow_id: UUID, + db: Session = Depends(get_db), + current_user: Profile = Depends(get_current_user), +): + flow = db.query(Flow).filter(Flow.id == flow_id).first() + if not flow: + raise HTTPException(status_code=404, detail="flow not found") + db.delete(flow) + db.commit() + return None + + +@router.post("/{flow_id}/launch") +async def launch_flow( + flow_id: str, + payload: launchFlowPayload, + db: Session = Depends(get_db), + current_user: Profile = Depends(get_current_user), +): + try: + flow = db.query(Flow).filter(Flow.id == flow_id).first() + if flow is None: + raise HTTPException(status_code=404, detail="flow not found") + nodes = [Node(**node) for node in flow.flow_schema["nodes"]] + edges = [Edge(**edge) for edge in flow.flow_schema["edges"]] + flow_branches = compute_flow_branches(payload.values, nodes, edges) + serializable_branches = [branch.model_dump() for branch in flow_branches] + task = celery.send_task( + "run_flow", + args=[ + serializable_branches, + payload.values, + payload.sketch_id, + str(current_user.id), + ], + ) + return {"id": task.id} + + except Exception as e: + print(e) + raise HTTPException(status_code=404, detail="flow not found") + + +@router.post("/{flow_id}/compute", response_model=FlowComputationResponse) +def compute_flows( + request: FlowComputationRequest, current_user: Profile = Depends(get_current_user) +): + initial_data = generate_sample_data(request.inputType or "string") + flow_branches = compute_flow_branches(initial_data, request.nodes, request.edges) + return FlowComputationResponse(flowBranches=flow_branches, initialData=initial_data) + + +def generate_sample_data(type_str: str) -> Any: + type_str = type_str.lower() if type_str else "string" + if type_str == "string": + return "sample_text" + elif type_str == "number": + return 42 + elif type_str == "boolean": + return True + elif type_str == "array": + return [1, 2, 3] + elif type_str == "object": + return {"key": "value"} + elif type_str == "url": + return "https://example.com" + elif type_str == "email": + return "user@example.com" + elif type_str == "domain": + return "example.com" + elif type_str == "ip": + return "192.168.1.1" + else: + return f"sample_{type_str}" + + +def compute_flow_branches( + initial_value: Any, nodes: List[Node], edges: List[Edge] +) -> List[FlowBranch]: + """Computes flow branches based on nodes and edges with proper DFS traversal""" + # Find input nodes (starting points) + input_nodes = [node for node in nodes if node.data.get("type") == "type"] + + if not input_nodes: + return [ + FlowBranch( + id="error", + name="Error", + steps=[ + FlowStep( + nodeId="error", + inputs={}, + type="error", + outputs={}, + status="error", + branchId="error", + depth=0, + ) + ], + ) + ] + + node_map = {node.id: node for node in nodes} + branches = [] + branch_counter = 0 + # Track scanner outputs across all branches + scanner_outputs = {} + + def calculate_path_length(start_node: str, visited: set = None) -> int: + """Calculate the shortest possible path length from a node to any leaf""" + if visited is None: + visited = set() + + if start_node in visited: + return float("inf") + + visited.add(start_node) + out_edges = [edge for edge in edges if edge.source == start_node] + + if not out_edges: + return 1 + + min_length = float("inf") + for edge in out_edges: + length = calculate_path_length(edge.target, visited.copy()) + min_length = min(min_length, length) + + return 1 + min_length + + def get_outgoing_edges(node_id: str) -> List[Edge]: + """Get outgoing edges sorted by the shortest possible path length""" + out_edges = [edge for edge in edges if edge.source == node_id] + # Sort edges by the length of the shortest possible path from their target + return sorted(out_edges, key=lambda e: calculate_path_length(e.target)) + + def create_step( + node_id: str, + branch_id: str, + depth: int, + input_data: Dict[str, Any], + is_input_node: bool, + outputs: Dict[str, Any], + node_params: Optional[Dict[str, Any]] = None, + ) -> FlowStep: + return FlowStep( + nodeId=node_id, + params=node_params, + inputs={} if is_input_node else input_data, + outputs=outputs, + type="type" if is_input_node else "scanner", + status="pending", + branchId=branch_id, + depth=depth, + ) + + def explore_branch( + current_node_id: str, + branch_id: str, + branch_name: str, + depth: int, + input_data: Dict[str, Any], + path: List[str], + branch_visited: set, + steps: List[FlowStep], + parent_outputs: Dict[str, Any] = None, + ) -> None: + nonlocal branch_counter + + # Skip if node is already in current path (cycle detection) + if current_node_id in path: + return + + current_node = node_map.get(current_node_id) + if not current_node: + return + + # Process node outputs + is_input_node = current_node.data.get("type") == "type" + if is_input_node: + outputs_array = current_node.data["outputs"].get("properties", []) + first_output_name = ( + outputs_array[0].get("name", "output") if outputs_array else "output" + ) + current_outputs = {first_output_name: initial_value} + else: + # Check if we already have outputs for this scanner + if current_node_id in scanner_outputs: + current_outputs = scanner_outputs[current_node_id] + else: + current_outputs = process_node_data(current_node, input_data) + # Store the outputs for future use + scanner_outputs[current_node_id] = current_outputs + + # Extract node parameters + node_params = current_node.data.get("params", {}) + + # Create and add current step + current_step = create_step( + current_node_id, + branch_id, + depth, + input_data, + is_input_node, + current_outputs, + node_params, + ) + steps.append(current_step) + path.append(current_node_id) + branch_visited.add(current_node_id) + + # Get all outgoing edges sorted by path length + out_edges = get_outgoing_edges(current_node_id) + + if not out_edges: + # Leaf node reached, save the branch + branches.append(FlowBranch(id=branch_id, name=branch_name, steps=steps[:])) + else: + # Process each outgoing edge in order of shortest path + for i, edge in enumerate(out_edges): + if edge.target in path: # Skip if would create cycle + continue + + # Prepare next node's input + output_key = edge.sourceHandle + if not output_key and current_outputs: + output_key = list(current_outputs.keys())[0] + + output_value = current_outputs.get(output_key) if output_key else None + if output_value is None and parent_outputs: + output_value = ( + parent_outputs.get(output_key) if output_key else None + ) + + next_input = {edge.targetHandle or "input": output_value} + + if i == 0: + # Continue in same branch (will be shortest path) + explore_branch( + edge.target, + branch_id, + branch_name, + depth + 1, + next_input, + path, + branch_visited, + steps, + current_outputs, + ) + else: + # Create new branch starting from current node + branch_counter += 1 + new_branch_id = f"{branch_id}-{branch_counter}" + new_branch_name = f"{branch_name} (Branch {branch_counter})" + new_steps = steps[: len(steps)] # Copy steps up to current node + new_branch_visited = ( + branch_visited.copy() + ) # Create new visited set for the branch + explore_branch( + edge.target, + new_branch_id, + new_branch_name, + depth + 1, + next_input, + path[:], # Create new path copy for branch + new_branch_visited, + new_steps, + current_outputs, + ) + + # Backtrack: remove current node from path and remove its step + path.pop() + steps.pop() + + # Start exploration from each input node + for index, input_node in enumerate(input_nodes): + branch_id = f"branch-{index}" + branch_name = f"Flow {index + 1}" if len(input_nodes) > 1 else "Main Flow" + explore_branch( + input_node.id, + branch_id, + branch_name, + 0, + {}, + [], # Use list for path to maintain order + set(), # Use set for visited to check membership + [], + None, + ) + + # Sort branches by length (number of steps) + branches.sort(key=lambda branch: len(branch.steps)) + return branches + + +def process_node_data(node: Node, inputs: Dict[str, Any]) -> Dict[str, Any]: + """Traite les données de nœud en fonction du type de nœud et des entrées""" + outputs = {} + output_types = node.data["outputs"].get("properties", []) + + for output in output_types: + output_name = output.get("name", "output") + class_name = node.data.get("class_name", "") + # For simulation purposes, we'll return a placeholder value based on the scanner type + if class_name in ["ReverseResolveScanner", "ResolveScanner"]: + # IP/Domain resolution scanners + outputs[output_name] = ( + "192.168.1.1" if "ip" in output_name.lower() else "example.com" + ) + elif class_name == "SubdomainScanner": + # Subdomain scanner + outputs[output_name] = f"sub.{inputs.get('input', 'example.com')}" + + elif class_name == "WhoisScanner": + # WHOIS scanner + outputs[output_name] = { + "domain": inputs.get("input", "example.com"), + "registrar": "Example Registrar", + "creation_date": "2020-01-01", + } + + elif class_name == "GeolocationScanner": + # Geolocation scanner + outputs[output_name] = { + "country": "France", + "city": "Paris", + "coordinates": {"lat": 48.8566, "lon": 2.3522}, + } + + elif class_name == "MaigretScanner": + # Social media scanner + outputs[output_name] = { + "username": inputs.get("input", "user123"), + "platforms": ["twitter", "github", "linkedin"], + } + + elif class_name == "HoleheScanner": + # Email verification scanner + outputs[output_name] = { + "email": inputs.get("input", "user@example.com"), + "exists": True, + "platforms": ["gmail", "github"], + } + + elif class_name == "SireneScanner": + # Organization scanner + outputs[output_name] = { + "name": inputs.get("input", "Example Corp"), + "siret": "12345678901234", + "address": "1 Example Street", + } + + else: + # For unknown scanners, pass through the input + outputs[output_name] = inputs.get("input") or f"flowed_{output_name}" + + return outputs diff --git a/flowsint-api/app/api/routes/investigations.py b/flowsint-api/app/api/routes/investigations.py index 8661e48..4bc0add 100644 --- a/flowsint-api/app/api/routes/investigations.py +++ b/flowsint-api/app/api/routes/investigations.py @@ -23,7 +23,10 @@ def get_investigations( db: Session = Depends(get_db), current_user: Profile = Depends(get_current_user) ): investigations = ( - db.query(Investigation).filter(Investigation.owner_id == current_user.id).all() + db.query(Investigation) + .options(selectinload(Investigation.sketches), selectinload(Investigation.analyses), selectinload(Investigation.owner)) + .filter(Investigation.owner_id == current_user.id) + .all() ) return investigations @@ -61,7 +64,7 @@ def get_investigation_by_id( ): investigation = ( db.query(Investigation) - .options(selectinload(Investigation.sketches)) + .options(selectinload(Investigation.sketches), selectinload(Investigation.analyses), selectinload(Investigation.owner)) .filter(Investigation.id == investigation_id) .filter(Investigation.owner_id == current_user.id) .first() diff --git a/flowsint-api/app/api/routes/transforms.py b/flowsint-api/app/api/routes/transforms.py index 475e3fa..d44d3ca 100644 --- a/flowsint-api/app/api/routes/transforms.py +++ b/flowsint-api/app/api/routes/transforms.py @@ -1,27 +1,11 @@ -from uuid import UUID, uuid4 -from fastapi import APIRouter, HTTPException, Depends, status, Query -from typing import Dict, List, Any, Optional +from fastapi import APIRouter, HTTPException, Depends, Query +from typing import List, Any, Optional from pydantic import BaseModel -from datetime import datetime -from flowsint_core.utils import extract_input_schema_transform -from flowsint_core.core.registry import ScannerRegistry +from flowsint_core.core.registry import TransformRegistry from flowsint_core.core.celery import celery -from flowsint_types import Domain, Phrase, Ip, SocialProfile, Organization, Email -from flowsint_core.core.types import Node, Edge, FlowStep, FlowBranch -from sqlalchemy.orm import Session -from flowsint_core.core.postgre_db import get_db -from app.models.models import Transform, Profile +from flowsint_core.core.types import Node, Edge, FlowBranch +from app.models.models import Profile from app.api.deps import get_current_user -from app.api.schemas.transform import TransformRead, TransformCreate, TransformUpdate -from flowsint_types import ( - ASN, - CIDR, - CryptoWallet, - CryptoWalletTransaction, - CryptoNFT, - Website, - Individual, -) class FlowComputationRequest(BaseModel): @@ -31,16 +15,16 @@ class FlowComputationRequest(BaseModel): class FlowComputationResponse(BaseModel): - transformBranches: List[FlowBranch] + flowBranches: List[FlowBranch] initialData: Any class StepSimulationRequest(BaseModel): - transformBranches: List[FlowBranch] + flowBranches: List[FlowBranch] currentStepIndex: int -class LaunchTransformPayload(BaseModel): +class launchTransformPayload(BaseModel): values: List[str] sketch_id: str @@ -49,180 +33,29 @@ router = APIRouter() # Get the list of all transforms -@router.get("", response_model=List[TransformRead]) +@router.get("/") def get_transforms( category: Optional[str] = Query(None), - db: Session = Depends(get_db), - current_user: Profile = Depends(get_current_user), + # current_user: Profile = Depends(get_current_user), ): - query = db.query(Transform) - if category is not None and category != "undefined": - # Case-insensitive filtering by checking if any category matches (case-insensitive) - transforms = query.all() - return [ - transform - for transform in transforms - if any(cat.lower() == category.lower() for cat in transform.category) - ] - - return query.order_by(Transform.last_updated_at.desc()).all() + transforms = TransformRegistry.list_by_input_type(category) + else: + transforms = TransformRegistry.list() + return transforms -# Returns the "raw_materials" for the transform editor -@router.get("/raw_materials") -async def get_material_list(): - scanners = ScannerRegistry.list_by_categories() - scanner_categories = { - category: [ - { - "class_name": scanner.get("class_name"), - "category": scanner.get("category"), - "name": scanner.get("name"), - "module": scanner.get("module"), - "documentation": scanner.get("documentation"), - "description": scanner.get("description"), - "inputs": scanner.get("inputs"), - "outputs": scanner.get("outputs"), - "type": "scanner", - "params": scanner.get("params"), - "params_schema": scanner.get("params_schema"), - "required_params": scanner.get("required_params"), - "icon": scanner.get("icon"), - } - for scanner in scanner_list - ] - for category, scanner_list in scanners.items() - } - - object_inputs = [ - extract_input_schema_transform(Phrase), - extract_input_schema_transform(Organization), - extract_input_schema_transform(Individual), - extract_input_schema_transform(Domain), - extract_input_schema_transform(Website), - extract_input_schema_transform(Ip), - extract_input_schema_transform(ASN), - extract_input_schema_transform(CIDR), - extract_input_schema_transform(SocialProfile), - extract_input_schema_transform(Email), - extract_input_schema_transform(CryptoWallet), - extract_input_schema_transform(CryptoWalletTransaction), - extract_input_schema_transform(CryptoNFT), - ] - - # Put types first, then add all scanner categories - flattened_scanners = {"types": object_inputs} - flattened_scanners.update(scanner_categories) - - return {"items": flattened_scanners} - - -# Returns the "raw_materials" for the transform editor -@router.get("/input_type/{input_type}") -async def get_material_list(input_type: str): - scanners = ScannerRegistry.list_by_input_type(input_type) - return {"items": scanners} - - -# Create a new transform -@router.post( - "/create", response_model=TransformRead, status_code=status.HTTP_201_CREATED -) -def create_transform( - payload: TransformCreate, - db: Session = Depends(get_db), - current_user: Profile = Depends(get_current_user), -): - - new_transform = Transform( - id=uuid4(), - name=payload.name, - description=payload.description, - category=payload.category, - transform_schema=payload.transform_schema, - created_at=datetime.utcnow(), - last_updated_at=datetime.utcnow(), - ) - db.add(new_transform) - db.commit() - db.refresh(new_transform) - return new_transform - - -# Get a transform by ID -@router.get("/{transform_id}", response_model=TransformRead) -def get_transform_by_id( - transform_id: UUID, - db: Session = Depends(get_db), - current_user: Profile = Depends(get_current_user), -): - transform = db.query(Transform).filter(Transform.id == transform_id).first() - if not transform: - raise HTTPException(status_code=404, detail="Transform not found") - return transform - - -# Update a transform by ID -@router.put("/{transform_id}", response_model=TransformRead) -def update_transform( - transform_id: UUID, - payload: TransformUpdate, - db: Session = Depends(get_db), - current_user: Profile = Depends(get_current_user), -): - transform = db.query(Transform).filter(Transform.id == transform_id).first() - if not transform: - raise HTTPException(status_code=404, detail="Transform not found") - update_data = payload.dict(exclude_unset=True) - for key, value in update_data.items(): - print(f"only update {key}") - if key == "category": - if "SocialProfile" in value: - value.append("Username") - setattr(transform, key, value) - - transform.last_updated_at = datetime.utcnow() - - db.commit() - db.refresh(transform) - return transform - - -# Delete a transform by ID -@router.delete("/{transform_id}", status_code=status.HTTP_204_NO_CONTENT) -def delete_transform( - transform_id: UUID, - db: Session = Depends(get_db), - current_user: Profile = Depends(get_current_user), -): - transform = db.query(Transform).filter(Transform.id == transform_id).first() - if not transform: - raise HTTPException(status_code=404, detail="Transform not found") - db.delete(transform) - db.commit() - return None - - -@router.post("/{transform_id}/launch") +@router.post("/{transform_name}/launch") async def launch_transform( - transform_id: str, - payload: LaunchTransformPayload, - db: Session = Depends(get_db), + transform_name: str, + payload: launchTransformPayload, current_user: Profile = Depends(get_current_user), ): try: - transform = db.query(Transform).filter(Transform.id == transform_id).first() - if transform is None: - raise HTTPException(status_code=404, detail="Transform not found") - nodes = [Node(**node) for node in transform.transform_schema["nodes"]] - edges = [Edge(**edge) for edge in transform.transform_schema["edges"]] - transform_branches = compute_transform_branches(payload.values, nodes, edges) - serializable_branches = [branch.model_dump() for branch in transform_branches] task = celery.send_task( "run_transform", args=[ - serializable_branches, + transform_name, payload.values, payload.sketch_id, str(current_user.id), @@ -233,322 +66,3 @@ async def launch_transform( except Exception as e: print(e) raise HTTPException(status_code=404, detail="Transform not found") - - -@router.post("/{transform_id}/compute", response_model=FlowComputationResponse) -def compute_transforms( - request: FlowComputationRequest, current_user: Profile = Depends(get_current_user) -): - initial_data = generate_sample_data(request.inputType or "string") - transform_branches = compute_transform_branches( - initial_data, request.nodes, request.edges - ) - return FlowComputationResponse( - transformBranches=transform_branches, initialData=initial_data - ) - - -def generate_sample_data(type_str: str) -> Any: - type_str = type_str.lower() if type_str else "string" - if type_str == "string": - return "sample_text" - elif type_str == "number": - return 42 - elif type_str == "boolean": - return True - elif type_str == "array": - return [1, 2, 3] - elif type_str == "object": - return {"key": "value"} - elif type_str == "url": - return "https://example.com" - elif type_str == "email": - return "user@example.com" - elif type_str == "domain": - return "example.com" - elif type_str == "ip": - return "192.168.1.1" - else: - return f"sample_{type_str}" - - -def compute_transform_branches( - initial_value: Any, nodes: List[Node], edges: List[Edge] -) -> List[FlowBranch]: - """Computes flow branches based on nodes and edges with proper DFS traversal""" - # Find input nodes (starting points) - input_nodes = [node for node in nodes if node.data.get("type") == "type"] - - if not input_nodes: - return [ - FlowBranch( - id="error", - name="Error", - steps=[ - FlowStep( - nodeId="error", - inputs={}, - type="error", - outputs={}, - status="error", - branchId="error", - depth=0, - ) - ], - ) - ] - - node_map = {node.id: node for node in nodes} - branches = [] - branch_counter = 0 - # Track scanner outputs across all branches - scanner_outputs = {} - - def calculate_path_length(start_node: str, visited: set = None) -> int: - """Calculate the shortest possible path length from a node to any leaf""" - if visited is None: - visited = set() - - if start_node in visited: - return float("inf") - - visited.add(start_node) - out_edges = [edge for edge in edges if edge.source == start_node] - - if not out_edges: - return 1 - - min_length = float("inf") - for edge in out_edges: - length = calculate_path_length(edge.target, visited.copy()) - min_length = min(min_length, length) - - return 1 + min_length - - def get_outgoing_edges(node_id: str) -> List[Edge]: - """Get outgoing edges sorted by the shortest possible path length""" - out_edges = [edge for edge in edges if edge.source == node_id] - # Sort edges by the length of the shortest possible path from their target - return sorted(out_edges, key=lambda e: calculate_path_length(e.target)) - - def create_step( - node_id: str, - branch_id: str, - depth: int, - input_data: Dict[str, Any], - is_input_node: bool, - outputs: Dict[str, Any], - node_params: Optional[Dict[str, Any]] = None, - ) -> FlowStep: - return FlowStep( - nodeId=node_id, - params=node_params, - inputs={} if is_input_node else input_data, - outputs=outputs, - type="type" if is_input_node else "scanner", - status="pending", - branchId=branch_id, - depth=depth, - ) - - def explore_branch( - current_node_id: str, - branch_id: str, - branch_name: str, - depth: int, - input_data: Dict[str, Any], - path: List[str], - branch_visited: set, - steps: List[FlowStep], - parent_outputs: Dict[str, Any] = None, - ) -> None: - nonlocal branch_counter - - # Skip if node is already in current path (cycle detection) - if current_node_id in path: - return - - current_node = node_map.get(current_node_id) - if not current_node: - return - - # Process node outputs - is_input_node = current_node.data.get("type") == "type" - if is_input_node: - outputs_array = current_node.data["outputs"].get("properties", []) - first_output_name = ( - outputs_array[0].get("name", "output") if outputs_array else "output" - ) - current_outputs = {first_output_name: initial_value} - else: - # Check if we already have outputs for this scanner - if current_node_id in scanner_outputs: - current_outputs = scanner_outputs[current_node_id] - else: - current_outputs = process_node_data(current_node, input_data) - # Store the outputs for future use - scanner_outputs[current_node_id] = current_outputs - - # Extract node parameters - node_params = current_node.data.get("params", {}) - - # Create and add current step - current_step = create_step( - current_node_id, - branch_id, - depth, - input_data, - is_input_node, - current_outputs, - node_params, - ) - steps.append(current_step) - path.append(current_node_id) - branch_visited.add(current_node_id) - - # Get all outgoing edges sorted by path length - out_edges = get_outgoing_edges(current_node_id) - - if not out_edges: - # Leaf node reached, save the branch - branches.append(FlowBranch(id=branch_id, name=branch_name, steps=steps[:])) - else: - # Process each outgoing edge in order of shortest path - for i, edge in enumerate(out_edges): - if edge.target in path: # Skip if would create cycle - continue - - # Prepare next node's input - output_key = edge.sourceHandle - if not output_key and current_outputs: - output_key = list(current_outputs.keys())[0] - - output_value = current_outputs.get(output_key) if output_key else None - if output_value is None and parent_outputs: - output_value = ( - parent_outputs.get(output_key) if output_key else None - ) - - next_input = {edge.targetHandle or "input": output_value} - - if i == 0: - # Continue in same branch (will be shortest path) - explore_branch( - edge.target, - branch_id, - branch_name, - depth + 1, - next_input, - path, - branch_visited, - steps, - current_outputs, - ) - else: - # Create new branch starting from current node - branch_counter += 1 - new_branch_id = f"{branch_id}-{branch_counter}" - new_branch_name = f"{branch_name} (Branch {branch_counter})" - new_steps = steps[: len(steps)] # Copy steps up to current node - new_branch_visited = ( - branch_visited.copy() - ) # Create new visited set for the branch - explore_branch( - edge.target, - new_branch_id, - new_branch_name, - depth + 1, - next_input, - path[:], # Create new path copy for branch - new_branch_visited, - new_steps, - current_outputs, - ) - - # Backtrack: remove current node from path and remove its step - path.pop() - steps.pop() - - # Start exploration from each input node - for index, input_node in enumerate(input_nodes): - branch_id = f"branch-{index}" - branch_name = f"Flow {index + 1}" if len(input_nodes) > 1 else "Main Flow" - explore_branch( - input_node.id, - branch_id, - branch_name, - 0, - {}, - [], # Use list for path to maintain order - set(), # Use set for visited to check membership - [], - None, - ) - - # Sort branches by length (number of steps) - branches.sort(key=lambda branch: len(branch.steps)) - return branches - - -def process_node_data(node: Node, inputs: Dict[str, Any]) -> Dict[str, Any]: - """Traite les données de nœud en fonction du type de nœud et des entrées""" - outputs = {} - output_types = node.data["outputs"].get("properties", []) - - for output in output_types: - output_name = output.get("name", "output") - class_name = node.data.get("class_name", "") - # For simulation purposes, we'll return a placeholder value based on the scanner type - if class_name in ["ReverseResolveScanner", "ResolveScanner"]: - # IP/Domain resolution scanners - outputs[output_name] = ( - "192.168.1.1" if "ip" in output_name.lower() else "example.com" - ) - elif class_name == "SubdomainScanner": - # Subdomain scanner - outputs[output_name] = f"sub.{inputs.get('input', 'example.com')}" - - elif class_name == "WhoisScanner": - # WHOIS scanner - outputs[output_name] = { - "domain": inputs.get("input", "example.com"), - "registrar": "Example Registrar", - "creation_date": "2020-01-01", - } - - elif class_name == "GeolocationScanner": - # Geolocation scanner - outputs[output_name] = { - "country": "France", - "city": "Paris", - "coordinates": {"lat": 48.8566, "lon": 2.3522}, - } - - elif class_name == "MaigretScanner": - # Social media scanner - outputs[output_name] = { - "username": inputs.get("input", "user123"), - "platforms": ["twitter", "github", "linkedin"], - } - - elif class_name == "HoleheScanner": - # Email verification scanner - outputs[output_name] = { - "email": inputs.get("input", "user@example.com"), - "exists": True, - "platforms": ["gmail", "github"], - } - - elif class_name == "SireneScanner": - # Organization scanner - outputs[output_name] = { - "name": inputs.get("input", "Example Corp"), - "siret": "12345678901234", - "address": "1 Example Street", - } - - else: - # For unknown scanners, pass through the input - outputs[output_name] = inputs.get("input") or f"transformed_{output_name}" - - return outputs diff --git a/flowsint-api/app/api/schemas/flow.py b/flowsint-api/app/api/schemas/flow.py new file mode 100644 index 0000000..5912698 --- /dev/null +++ b/flowsint-api/app/api/schemas/flow.py @@ -0,0 +1,29 @@ +from .base import ORMBase +from pydantic import UUID4, BaseModel +from typing import Optional +from datetime import datetime +from typing import List, Optional, Dict, Any + + +class FlowCreate(BaseModel): + name: str + description: Optional[str] = None + category: Optional[List[str]] = None + flow_schema: Optional[Dict[str, Any]] = None + + +class FlowRead(ORMBase): + id: UUID4 + name: str + description: Optional[str] + category: Optional[List[str]] + flow_schema: Optional[Dict[str, Any]] + created_at: datetime + last_updated_at: datetime + + +class FlowUpdate(BaseModel): + name: Optional[str] = None + description: Optional[str] = None + category: Optional[List[str]] = None + flow_schema: Optional[Dict[str, Any]] = None diff --git a/flowsint-api/app/api/schemas/investigation.py b/flowsint-api/app/api/schemas/investigation.py index cccb1c0..d9b5d89 100644 --- a/flowsint-api/app/api/schemas/investigation.py +++ b/flowsint-api/app/api/schemas/investigation.py @@ -3,6 +3,8 @@ from pydantic import UUID4, BaseModel from typing import Optional from datetime import datetime from .sketch import SketchRead +from .analysis import AnalysisRead +from .profile import ProfileRead class InvestigationCreate(BaseModel): @@ -20,7 +22,9 @@ class InvestigationRead(ORMBase): owner_id: Optional[UUID4] last_updated_at: datetime status: str + owner: Optional[ProfileRead] = None sketches: list[SketchRead] = [] + analyses: list[AnalysisRead] = [] class InvestigationProfileCreate(BaseModel): diff --git a/flowsint-api/app/api/schemas/transform.py b/flowsint-api/app/api/schemas/transform.py index bc28250..455266e 100644 --- a/flowsint-api/app/api/schemas/transform.py +++ b/flowsint-api/app/api/schemas/transform.py @@ -1,7 +1,6 @@ from .base import ORMBase from pydantic import UUID4, BaseModel from typing import Optional -from datetime import datetime from typing import List, Optional, Dict, Any @@ -9,21 +8,19 @@ class TransformCreate(BaseModel): name: str description: Optional[str] = None category: Optional[List[str]] = None - transform_schema: Optional[Dict[str, Any]] = None class TransformRead(ORMBase): id: UUID4 name: str + class_name: str description: Optional[str] category: Optional[List[str]] - transform_schema: Optional[Dict[str, Any]] - created_at: datetime - last_updated_at: datetime + flow_schema: Optional[Dict[str, Any]] class TransformUpdate(BaseModel): name: Optional[str] = None + class_name: str = None description: Optional[str] = None category: Optional[List[str]] = None - transform_schema: Optional[Dict[str, Any]] = None diff --git a/flowsint-api/app/main.py b/flowsint-api/app/main.py index 8a91b89..e33c641 100644 --- a/flowsint-api/app/main.py +++ b/flowsint-api/app/main.py @@ -1,7 +1,6 @@ from fastapi import FastAPI from flowsint_core.core.graph_db import Neo4jConnection import os -from typing import List from dotenv import load_dotenv from fastapi.middleware.cors import CORSMiddleware @@ -10,6 +9,7 @@ from app.api.routes import auth from app.api.routes import investigations from app.api.routes import sketches from app.api.routes import transforms +from app.api.routes import flows from app.api.routes import events from app.api.routes import analysis from app.api.routes import chat @@ -58,6 +58,7 @@ app.include_router( investigations.router, prefix="/api/investigations", tags=["investigations"] ) app.include_router(transforms.router, prefix="/api/transforms", tags=["transforms"]) +app.include_router(flows.router, prefix="/api/flows", tags=["flows"]) app.include_router(events.router, prefix="/api/events", tags=["events"]) app.include_router(analysis.router, prefix="/api/analyses", tags=["analyses"]) app.include_router(chat.router, prefix="/api/chats", tags=["chats"]) diff --git a/flowsint-api/app/models/models.py b/flowsint-api/app/models/models.py index 86cd56f..34094d6 100644 --- a/flowsint-api/app/models/models.py +++ b/flowsint-api/app/models/models.py @@ -50,6 +50,7 @@ class Investigation(Base): sketches = relationship("Sketch", back_populates="investigation") analyses = relationship("Analysis", back_populates="investigation") chats = relationship("Chat", back_populates="investigation") + owner = relationship("Profile", foreign_keys=[owner_id]) __table_args__ = ( Index("idx_investigations_id", "id"), Index("idx_investigations_owner_id", "owner_id"), @@ -194,8 +195,8 @@ class SketchesProfiles(Base): ) -class Transform(Base): - __tablename__ = "transforms" +class Flow(Base): + __tablename__ = "flows" id: Mapped[uuid.UUID] = mapped_column( PGUUID(as_uuid=True), primary_key=True, default=uuid.uuid4 @@ -203,7 +204,7 @@ class Transform(Base): name = mapped_column(Text, nullable=False) description = mapped_column(Text, nullable=True) category = mapped_column(ARRAY(Text), nullable=True) - transform_schema = mapped_column(JSON, nullable=True) + flow_schema = mapped_column(JSON, nullable=True) created_at = mapped_column(DateTime(timezone=True), server_default=func.now()) last_updated_at = mapped_column(DateTime(timezone=True), server_default=func.now()) diff --git a/flowsint-api/app/utils.py b/flowsint-api/app/utils.py index 238eed7..5a6441a 100644 --- a/flowsint-api/app/utils.py +++ b/flowsint-api/app/utils.py @@ -159,7 +159,7 @@ def resolve_type(details: dict, schema_context: dict = None) -> str: return "any" -def extract_input_schema_transform(model: Type[BaseModel]) -> Dict[str, Any]: +def extract_input_schema_flow(model: Type[BaseModel]) -> Dict[str, Any]: adapter = TypeAdapter(model) schema = adapter.json_schema() @@ -225,7 +225,7 @@ def extract_transform(transform: Dict[str, Any]) -> Dict[str, Any]: if scanner_node and scanner_node["data"]["type"] == "scanner": scanners.append( { - "scanner_name": scanner_node["data"]["name"], + "transform_name": scanner_node["data"]["name"], "module": scanner_node["data"]["module"], "input": source_handle, "output": target_handle, @@ -238,7 +238,7 @@ def extract_transform(transform: Dict[str, Any]) -> Dict[str, Any]: "outputs": input_output, }, "scanners": scanners, - "scanner_names": [scanner["scanner_name"] for scanner in scanners], + "transform_names": [scanner["transform_name"] for scanner in scanners], } diff --git a/flowsint-app/src/renderer/index.html b/flowsint-app/src/renderer/index.html index c41e657..153b424 100644 --- a/flowsint-app/src/renderer/index.html +++ b/flowsint-app/src/renderer/index.html @@ -1,11 +1,11 @@
- + diff --git a/flowsint-app/src/renderer/src/api/flow-service.ts b/flowsint-app/src/renderer/src/api/flow-service.ts new file mode 100644 index 0000000..0efb6f3 --- /dev/null +++ b/flowsint-app/src/renderer/src/api/flow-service.ts @@ -0,0 +1,54 @@ +import { fetchWithAuth } from './api'; + +export const flowService = { + get: async (type?: string): PromiseDelete Transform
+Delete flow