refactor: rename transform to flow

This commit is contained in:
dextmorgn
2025-08-15 16:37:36 +02:00
parent 7620a6d145
commit 5f0b0abe3f
71 changed files with 2785 additions and 1337 deletions

View File

@@ -0,0 +1,36 @@
"""rename_transforms_to_flows
Revision ID: 661ff8ef4425
Revises: 9a3b9a199aa8
Create Date: 2025-08-15 16:16:12.792775
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '661ff8ef4425'
down_revision: Union[str, None] = '9a3b9a199aa8'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# Rename the table from 'transforms' to 'flows'
op.rename_table('transforms', 'flows')
# Rename the column from 'transform_schema' to 'flow_schema'
op.alter_column('flows', 'transform_schema', new_column_name='flow_schema')
def downgrade() -> None:
"""Downgrade schema."""
# Rename the column back from 'flow_schema' to 'transform_schema'
op.alter_column('flows', 'flow_schema', new_column_name='transform_schema')
# Rename the table back from 'flows' to 'transforms'
op.rename_table('flows', 'transforms')

View File

@@ -0,0 +1,548 @@
from uuid import UUID, uuid4
from fastapi import APIRouter, HTTPException, Depends, status, Query
from typing import Dict, List, Any, Optional
from pydantic import BaseModel
from datetime import datetime
from flowsint_core.utils import extract_input_schema_flow
from flowsint_core.core.registry import TransformRegistry
from flowsint_core.core.celery import celery
from flowsint_types import Domain, Phrase, Ip, SocialProfile, Organization, Email
from flowsint_core.core.types import Node, Edge, FlowStep, FlowBranch
from sqlalchemy.orm import Session
from flowsint_core.core.postgre_db import get_db
from app.models.models import Flow, Profile
from app.api.deps import get_current_user
from app.api.schemas.flow import FlowRead, FlowCreate, FlowUpdate
from flowsint_types import (
ASN,
CIDR,
CryptoWallet,
CryptoWalletTransaction,
CryptoNFT,
Website,
Individual,
)
class FlowComputationRequest(BaseModel):
nodes: List[Node]
edges: List[Edge]
inputType: Optional[str] = None
class FlowComputationResponse(BaseModel):
flowBranches: List[FlowBranch]
initialData: Any
class StepSimulationRequest(BaseModel):
flowBranches: List[FlowBranch]
currentStepIndex: int
class launchFlowPayload(BaseModel):
values: List[str]
sketch_id: str
router = APIRouter()
# Get the list of all flows
@router.get("/", response_model=List[FlowRead])
def get_flows(
category: Optional[str] = Query(None),
db: Session = Depends(get_db),
current_user: Profile = Depends(get_current_user),
):
query = db.query(Flow)
if category is not None and category != "undefined":
# Case-insensitive filtering by checking if any category matches (case-insensitive)
flows = query.all()
return [
flow
for flow in flows
if any(cat.lower() == category.lower() for cat in flow.category)
]
return query.order_by(Flow.last_updated_at.desc()).all()
# Returns the "raw_materials" for the flow editor
@router.get("/raw_materials")
async def get_material_list():
scanners = TransformRegistry.list_by_categories()
scanner_categories = {
category: [
{
"class_name": scanner.get("class_name"),
"category": scanner.get("category"),
"name": scanner.get("name"),
"module": scanner.get("module"),
"documentation": scanner.get("documentation"),
"description": scanner.get("description"),
"inputs": scanner.get("inputs"),
"outputs": scanner.get("outputs"),
"type": "scanner",
"params": scanner.get("params"),
"params_schema": scanner.get("params_schema"),
"required_params": scanner.get("required_params"),
"icon": scanner.get("icon"),
}
for scanner in scanner_list
]
for category, scanner_list in scanners.items()
}
object_inputs = [
extract_input_schema_flow(Phrase),
extract_input_schema_flow(Organization),
extract_input_schema_flow(Individual),
extract_input_schema_flow(Domain),
extract_input_schema_flow(Website),
extract_input_schema_flow(Ip),
extract_input_schema_flow(ASN),
extract_input_schema_flow(CIDR),
extract_input_schema_flow(SocialProfile),
extract_input_schema_flow(Email),
extract_input_schema_flow(CryptoWallet),
extract_input_schema_flow(CryptoWalletTransaction),
extract_input_schema_flow(CryptoNFT),
]
# Put types first, then add all scanner categories
flattened_scanners = {"types": object_inputs}
flattened_scanners.update(scanner_categories)
return {"items": flattened_scanners}
# Returns the "raw_materials" for the flow editor
@router.get("/input_type/{input_type}")
async def get_material_list(input_type: str):
transforms = TransformRegistry.list_by_input_type(input_type)
return {"items": transforms}
# Create a new flow
@router.post("/create", response_model=FlowRead, status_code=status.HTTP_201_CREATED)
def create_flow(
payload: FlowCreate,
db: Session = Depends(get_db),
current_user: Profile = Depends(get_current_user),
):
new_flow = Flow(
id=uuid4(),
name=payload.name,
description=payload.description,
category=payload.category,
flow_schema=payload.flow_schema,
created_at=datetime.utcnow(),
last_updated_at=datetime.utcnow(),
)
db.add(new_flow)
db.commit()
db.refresh(new_flow)
return new_flow
# Get a flow by ID
@router.get("/{flow_id}", response_model=FlowRead)
def get_flow_by_id(
flow_id: UUID,
db: Session = Depends(get_db),
current_user: Profile = Depends(get_current_user),
):
flow = db.query(Flow).filter(Flow.id == flow_id).first()
if not flow:
raise HTTPException(status_code=404, detail="flow not found")
return flow
# Update a flow by ID
@router.put("/{flow_id}", response_model=FlowRead)
def update_flow(
flow_id: UUID,
payload: FlowUpdate,
db: Session = Depends(get_db),
current_user: Profile = Depends(get_current_user),
):
flow = db.query(Flow).filter(Flow.id == flow_id).first()
if not flow:
raise HTTPException(status_code=404, detail="flow not found")
update_data = payload.dict(exclude_unset=True)
for key, value in update_data.items():
print(f"only update {key}")
if key == "category":
if "SocialProfile" in value:
value.append("Username")
setattr(flow, key, value)
flow.last_updated_at = datetime.utcnow()
db.commit()
db.refresh(flow)
return flow
# Delete a flow by ID
@router.delete("/{flow_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_flow(
flow_id: UUID,
db: Session = Depends(get_db),
current_user: Profile = Depends(get_current_user),
):
flow = db.query(Flow).filter(Flow.id == flow_id).first()
if not flow:
raise HTTPException(status_code=404, detail="flow not found")
db.delete(flow)
db.commit()
return None
@router.post("/{flow_id}/launch")
async def launch_flow(
flow_id: str,
payload: launchFlowPayload,
db: Session = Depends(get_db),
current_user: Profile = Depends(get_current_user),
):
try:
flow = db.query(Flow).filter(Flow.id == flow_id).first()
if flow is None:
raise HTTPException(status_code=404, detail="flow not found")
nodes = [Node(**node) for node in flow.flow_schema["nodes"]]
edges = [Edge(**edge) for edge in flow.flow_schema["edges"]]
flow_branches = compute_flow_branches(payload.values, nodes, edges)
serializable_branches = [branch.model_dump() for branch in flow_branches]
task = celery.send_task(
"run_flow",
args=[
serializable_branches,
payload.values,
payload.sketch_id,
str(current_user.id),
],
)
return {"id": task.id}
except Exception as e:
print(e)
raise HTTPException(status_code=404, detail="flow not found")
@router.post("/{flow_id}/compute", response_model=FlowComputationResponse)
def compute_flows(
request: FlowComputationRequest, current_user: Profile = Depends(get_current_user)
):
initial_data = generate_sample_data(request.inputType or "string")
flow_branches = compute_flow_branches(initial_data, request.nodes, request.edges)
return FlowComputationResponse(flowBranches=flow_branches, initialData=initial_data)
def generate_sample_data(type_str: str) -> Any:
type_str = type_str.lower() if type_str else "string"
if type_str == "string":
return "sample_text"
elif type_str == "number":
return 42
elif type_str == "boolean":
return True
elif type_str == "array":
return [1, 2, 3]
elif type_str == "object":
return {"key": "value"}
elif type_str == "url":
return "https://example.com"
elif type_str == "email":
return "user@example.com"
elif type_str == "domain":
return "example.com"
elif type_str == "ip":
return "192.168.1.1"
else:
return f"sample_{type_str}"
def compute_flow_branches(
initial_value: Any, nodes: List[Node], edges: List[Edge]
) -> List[FlowBranch]:
"""Computes flow branches based on nodes and edges with proper DFS traversal"""
# Find input nodes (starting points)
input_nodes = [node for node in nodes if node.data.get("type") == "type"]
if not input_nodes:
return [
FlowBranch(
id="error",
name="Error",
steps=[
FlowStep(
nodeId="error",
inputs={},
type="error",
outputs={},
status="error",
branchId="error",
depth=0,
)
],
)
]
node_map = {node.id: node for node in nodes}
branches = []
branch_counter = 0
# Track scanner outputs across all branches
scanner_outputs = {}
def calculate_path_length(start_node: str, visited: set = None) -> int:
"""Calculate the shortest possible path length from a node to any leaf"""
if visited is None:
visited = set()
if start_node in visited:
return float("inf")
visited.add(start_node)
out_edges = [edge for edge in edges if edge.source == start_node]
if not out_edges:
return 1
min_length = float("inf")
for edge in out_edges:
length = calculate_path_length(edge.target, visited.copy())
min_length = min(min_length, length)
return 1 + min_length
def get_outgoing_edges(node_id: str) -> List[Edge]:
"""Get outgoing edges sorted by the shortest possible path length"""
out_edges = [edge for edge in edges if edge.source == node_id]
# Sort edges by the length of the shortest possible path from their target
return sorted(out_edges, key=lambda e: calculate_path_length(e.target))
def create_step(
node_id: str,
branch_id: str,
depth: int,
input_data: Dict[str, Any],
is_input_node: bool,
outputs: Dict[str, Any],
node_params: Optional[Dict[str, Any]] = None,
) -> FlowStep:
return FlowStep(
nodeId=node_id,
params=node_params,
inputs={} if is_input_node else input_data,
outputs=outputs,
type="type" if is_input_node else "scanner",
status="pending",
branchId=branch_id,
depth=depth,
)
def explore_branch(
current_node_id: str,
branch_id: str,
branch_name: str,
depth: int,
input_data: Dict[str, Any],
path: List[str],
branch_visited: set,
steps: List[FlowStep],
parent_outputs: Dict[str, Any] = None,
) -> None:
nonlocal branch_counter
# Skip if node is already in current path (cycle detection)
if current_node_id in path:
return
current_node = node_map.get(current_node_id)
if not current_node:
return
# Process node outputs
is_input_node = current_node.data.get("type") == "type"
if is_input_node:
outputs_array = current_node.data["outputs"].get("properties", [])
first_output_name = (
outputs_array[0].get("name", "output") if outputs_array else "output"
)
current_outputs = {first_output_name: initial_value}
else:
# Check if we already have outputs for this scanner
if current_node_id in scanner_outputs:
current_outputs = scanner_outputs[current_node_id]
else:
current_outputs = process_node_data(current_node, input_data)
# Store the outputs for future use
scanner_outputs[current_node_id] = current_outputs
# Extract node parameters
node_params = current_node.data.get("params", {})
# Create and add current step
current_step = create_step(
current_node_id,
branch_id,
depth,
input_data,
is_input_node,
current_outputs,
node_params,
)
steps.append(current_step)
path.append(current_node_id)
branch_visited.add(current_node_id)
# Get all outgoing edges sorted by path length
out_edges = get_outgoing_edges(current_node_id)
if not out_edges:
# Leaf node reached, save the branch
branches.append(FlowBranch(id=branch_id, name=branch_name, steps=steps[:]))
else:
# Process each outgoing edge in order of shortest path
for i, edge in enumerate(out_edges):
if edge.target in path: # Skip if would create cycle
continue
# Prepare next node's input
output_key = edge.sourceHandle
if not output_key and current_outputs:
output_key = list(current_outputs.keys())[0]
output_value = current_outputs.get(output_key) if output_key else None
if output_value is None and parent_outputs:
output_value = (
parent_outputs.get(output_key) if output_key else None
)
next_input = {edge.targetHandle or "input": output_value}
if i == 0:
# Continue in same branch (will be shortest path)
explore_branch(
edge.target,
branch_id,
branch_name,
depth + 1,
next_input,
path,
branch_visited,
steps,
current_outputs,
)
else:
# Create new branch starting from current node
branch_counter += 1
new_branch_id = f"{branch_id}-{branch_counter}"
new_branch_name = f"{branch_name} (Branch {branch_counter})"
new_steps = steps[: len(steps)] # Copy steps up to current node
new_branch_visited = (
branch_visited.copy()
) # Create new visited set for the branch
explore_branch(
edge.target,
new_branch_id,
new_branch_name,
depth + 1,
next_input,
path[:], # Create new path copy for branch
new_branch_visited,
new_steps,
current_outputs,
)
# Backtrack: remove current node from path and remove its step
path.pop()
steps.pop()
# Start exploration from each input node
for index, input_node in enumerate(input_nodes):
branch_id = f"branch-{index}"
branch_name = f"Flow {index + 1}" if len(input_nodes) > 1 else "Main Flow"
explore_branch(
input_node.id,
branch_id,
branch_name,
0,
{},
[], # Use list for path to maintain order
set(), # Use set for visited to check membership
[],
None,
)
# Sort branches by length (number of steps)
branches.sort(key=lambda branch: len(branch.steps))
return branches
def process_node_data(node: Node, inputs: Dict[str, Any]) -> Dict[str, Any]:
"""Traite les données de nœud en fonction du type de nœud et des entrées"""
outputs = {}
output_types = node.data["outputs"].get("properties", [])
for output in output_types:
output_name = output.get("name", "output")
class_name = node.data.get("class_name", "")
# For simulation purposes, we'll return a placeholder value based on the scanner type
if class_name in ["ReverseResolveScanner", "ResolveScanner"]:
# IP/Domain resolution scanners
outputs[output_name] = (
"192.168.1.1" if "ip" in output_name.lower() else "example.com"
)
elif class_name == "SubdomainScanner":
# Subdomain scanner
outputs[output_name] = f"sub.{inputs.get('input', 'example.com')}"
elif class_name == "WhoisScanner":
# WHOIS scanner
outputs[output_name] = {
"domain": inputs.get("input", "example.com"),
"registrar": "Example Registrar",
"creation_date": "2020-01-01",
}
elif class_name == "GeolocationScanner":
# Geolocation scanner
outputs[output_name] = {
"country": "France",
"city": "Paris",
"coordinates": {"lat": 48.8566, "lon": 2.3522},
}
elif class_name == "MaigretScanner":
# Social media scanner
outputs[output_name] = {
"username": inputs.get("input", "user123"),
"platforms": ["twitter", "github", "linkedin"],
}
elif class_name == "HoleheScanner":
# Email verification scanner
outputs[output_name] = {
"email": inputs.get("input", "user@example.com"),
"exists": True,
"platforms": ["gmail", "github"],
}
elif class_name == "SireneScanner":
# Organization scanner
outputs[output_name] = {
"name": inputs.get("input", "Example Corp"),
"siret": "12345678901234",
"address": "1 Example Street",
}
else:
# For unknown scanners, pass through the input
outputs[output_name] = inputs.get("input") or f"flowed_{output_name}"
return outputs

View File

@@ -23,7 +23,10 @@ def get_investigations(
db: Session = Depends(get_db), current_user: Profile = Depends(get_current_user)
):
investigations = (
db.query(Investigation).filter(Investigation.owner_id == current_user.id).all()
db.query(Investigation)
.options(selectinload(Investigation.sketches), selectinload(Investigation.analyses), selectinload(Investigation.owner))
.filter(Investigation.owner_id == current_user.id)
.all()
)
return investigations
@@ -61,7 +64,7 @@ def get_investigation_by_id(
):
investigation = (
db.query(Investigation)
.options(selectinload(Investigation.sketches))
.options(selectinload(Investigation.sketches), selectinload(Investigation.analyses), selectinload(Investigation.owner))
.filter(Investigation.id == investigation_id)
.filter(Investigation.owner_id == current_user.id)
.first()

View File

@@ -1,27 +1,11 @@
from uuid import UUID, uuid4
from fastapi import APIRouter, HTTPException, Depends, status, Query
from typing import Dict, List, Any, Optional
from fastapi import APIRouter, HTTPException, Depends, Query
from typing import List, Any, Optional
from pydantic import BaseModel
from datetime import datetime
from flowsint_core.utils import extract_input_schema_transform
from flowsint_core.core.registry import ScannerRegistry
from flowsint_core.core.registry import TransformRegistry
from flowsint_core.core.celery import celery
from flowsint_types import Domain, Phrase, Ip, SocialProfile, Organization, Email
from flowsint_core.core.types import Node, Edge, FlowStep, FlowBranch
from sqlalchemy.orm import Session
from flowsint_core.core.postgre_db import get_db
from app.models.models import Transform, Profile
from flowsint_core.core.types import Node, Edge, FlowBranch
from app.models.models import Profile
from app.api.deps import get_current_user
from app.api.schemas.transform import TransformRead, TransformCreate, TransformUpdate
from flowsint_types import (
ASN,
CIDR,
CryptoWallet,
CryptoWalletTransaction,
CryptoNFT,
Website,
Individual,
)
class FlowComputationRequest(BaseModel):
@@ -31,16 +15,16 @@ class FlowComputationRequest(BaseModel):
class FlowComputationResponse(BaseModel):
transformBranches: List[FlowBranch]
flowBranches: List[FlowBranch]
initialData: Any
class StepSimulationRequest(BaseModel):
transformBranches: List[FlowBranch]
flowBranches: List[FlowBranch]
currentStepIndex: int
class LaunchTransformPayload(BaseModel):
class launchTransformPayload(BaseModel):
values: List[str]
sketch_id: str
@@ -49,180 +33,29 @@ router = APIRouter()
# Get the list of all transforms
@router.get("", response_model=List[TransformRead])
@router.get("/")
def get_transforms(
category: Optional[str] = Query(None),
db: Session = Depends(get_db),
current_user: Profile = Depends(get_current_user),
# current_user: Profile = Depends(get_current_user),
):
query = db.query(Transform)
if category is not None and category != "undefined":
# Case-insensitive filtering by checking if any category matches (case-insensitive)
transforms = query.all()
return [
transform
for transform in transforms
if any(cat.lower() == category.lower() for cat in transform.category)
]
return query.order_by(Transform.last_updated_at.desc()).all()
transforms = TransformRegistry.list_by_input_type(category)
else:
transforms = TransformRegistry.list()
return transforms
# Returns the "raw_materials" for the transform editor
@router.get("/raw_materials")
async def get_material_list():
scanners = ScannerRegistry.list_by_categories()
scanner_categories = {
category: [
{
"class_name": scanner.get("class_name"),
"category": scanner.get("category"),
"name": scanner.get("name"),
"module": scanner.get("module"),
"documentation": scanner.get("documentation"),
"description": scanner.get("description"),
"inputs": scanner.get("inputs"),
"outputs": scanner.get("outputs"),
"type": "scanner",
"params": scanner.get("params"),
"params_schema": scanner.get("params_schema"),
"required_params": scanner.get("required_params"),
"icon": scanner.get("icon"),
}
for scanner in scanner_list
]
for category, scanner_list in scanners.items()
}
object_inputs = [
extract_input_schema_transform(Phrase),
extract_input_schema_transform(Organization),
extract_input_schema_transform(Individual),
extract_input_schema_transform(Domain),
extract_input_schema_transform(Website),
extract_input_schema_transform(Ip),
extract_input_schema_transform(ASN),
extract_input_schema_transform(CIDR),
extract_input_schema_transform(SocialProfile),
extract_input_schema_transform(Email),
extract_input_schema_transform(CryptoWallet),
extract_input_schema_transform(CryptoWalletTransaction),
extract_input_schema_transform(CryptoNFT),
]
# Put types first, then add all scanner categories
flattened_scanners = {"types": object_inputs}
flattened_scanners.update(scanner_categories)
return {"items": flattened_scanners}
# Returns the "raw_materials" for the transform editor
@router.get("/input_type/{input_type}")
async def get_material_list(input_type: str):
scanners = ScannerRegistry.list_by_input_type(input_type)
return {"items": scanners}
# Create a new transform
@router.post(
"/create", response_model=TransformRead, status_code=status.HTTP_201_CREATED
)
def create_transform(
payload: TransformCreate,
db: Session = Depends(get_db),
current_user: Profile = Depends(get_current_user),
):
new_transform = Transform(
id=uuid4(),
name=payload.name,
description=payload.description,
category=payload.category,
transform_schema=payload.transform_schema,
created_at=datetime.utcnow(),
last_updated_at=datetime.utcnow(),
)
db.add(new_transform)
db.commit()
db.refresh(new_transform)
return new_transform
# Get a transform by ID
@router.get("/{transform_id}", response_model=TransformRead)
def get_transform_by_id(
transform_id: UUID,
db: Session = Depends(get_db),
current_user: Profile = Depends(get_current_user),
):
transform = db.query(Transform).filter(Transform.id == transform_id).first()
if not transform:
raise HTTPException(status_code=404, detail="Transform not found")
return transform
# Update a transform by ID
@router.put("/{transform_id}", response_model=TransformRead)
def update_transform(
transform_id: UUID,
payload: TransformUpdate,
db: Session = Depends(get_db),
current_user: Profile = Depends(get_current_user),
):
transform = db.query(Transform).filter(Transform.id == transform_id).first()
if not transform:
raise HTTPException(status_code=404, detail="Transform not found")
update_data = payload.dict(exclude_unset=True)
for key, value in update_data.items():
print(f"only update {key}")
if key == "category":
if "SocialProfile" in value:
value.append("Username")
setattr(transform, key, value)
transform.last_updated_at = datetime.utcnow()
db.commit()
db.refresh(transform)
return transform
# Delete a transform by ID
@router.delete("/{transform_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_transform(
transform_id: UUID,
db: Session = Depends(get_db),
current_user: Profile = Depends(get_current_user),
):
transform = db.query(Transform).filter(Transform.id == transform_id).first()
if not transform:
raise HTTPException(status_code=404, detail="Transform not found")
db.delete(transform)
db.commit()
return None
@router.post("/{transform_id}/launch")
@router.post("/{transform_name}/launch")
async def launch_transform(
transform_id: str,
payload: LaunchTransformPayload,
db: Session = Depends(get_db),
transform_name: str,
payload: launchTransformPayload,
current_user: Profile = Depends(get_current_user),
):
try:
transform = db.query(Transform).filter(Transform.id == transform_id).first()
if transform is None:
raise HTTPException(status_code=404, detail="Transform not found")
nodes = [Node(**node) for node in transform.transform_schema["nodes"]]
edges = [Edge(**edge) for edge in transform.transform_schema["edges"]]
transform_branches = compute_transform_branches(payload.values, nodes, edges)
serializable_branches = [branch.model_dump() for branch in transform_branches]
task = celery.send_task(
"run_transform",
args=[
serializable_branches,
transform_name,
payload.values,
payload.sketch_id,
str(current_user.id),
@@ -233,322 +66,3 @@ async def launch_transform(
except Exception as e:
print(e)
raise HTTPException(status_code=404, detail="Transform not found")
@router.post("/{transform_id}/compute", response_model=FlowComputationResponse)
def compute_transforms(
request: FlowComputationRequest, current_user: Profile = Depends(get_current_user)
):
initial_data = generate_sample_data(request.inputType or "string")
transform_branches = compute_transform_branches(
initial_data, request.nodes, request.edges
)
return FlowComputationResponse(
transformBranches=transform_branches, initialData=initial_data
)
def generate_sample_data(type_str: str) -> Any:
type_str = type_str.lower() if type_str else "string"
if type_str == "string":
return "sample_text"
elif type_str == "number":
return 42
elif type_str == "boolean":
return True
elif type_str == "array":
return [1, 2, 3]
elif type_str == "object":
return {"key": "value"}
elif type_str == "url":
return "https://example.com"
elif type_str == "email":
return "user@example.com"
elif type_str == "domain":
return "example.com"
elif type_str == "ip":
return "192.168.1.1"
else:
return f"sample_{type_str}"
def compute_transform_branches(
initial_value: Any, nodes: List[Node], edges: List[Edge]
) -> List[FlowBranch]:
"""Computes flow branches based on nodes and edges with proper DFS traversal"""
# Find input nodes (starting points)
input_nodes = [node for node in nodes if node.data.get("type") == "type"]
if not input_nodes:
return [
FlowBranch(
id="error",
name="Error",
steps=[
FlowStep(
nodeId="error",
inputs={},
type="error",
outputs={},
status="error",
branchId="error",
depth=0,
)
],
)
]
node_map = {node.id: node for node in nodes}
branches = []
branch_counter = 0
# Track scanner outputs across all branches
scanner_outputs = {}
def calculate_path_length(start_node: str, visited: set = None) -> int:
"""Calculate the shortest possible path length from a node to any leaf"""
if visited is None:
visited = set()
if start_node in visited:
return float("inf")
visited.add(start_node)
out_edges = [edge for edge in edges if edge.source == start_node]
if not out_edges:
return 1
min_length = float("inf")
for edge in out_edges:
length = calculate_path_length(edge.target, visited.copy())
min_length = min(min_length, length)
return 1 + min_length
def get_outgoing_edges(node_id: str) -> List[Edge]:
"""Get outgoing edges sorted by the shortest possible path length"""
out_edges = [edge for edge in edges if edge.source == node_id]
# Sort edges by the length of the shortest possible path from their target
return sorted(out_edges, key=lambda e: calculate_path_length(e.target))
def create_step(
node_id: str,
branch_id: str,
depth: int,
input_data: Dict[str, Any],
is_input_node: bool,
outputs: Dict[str, Any],
node_params: Optional[Dict[str, Any]] = None,
) -> FlowStep:
return FlowStep(
nodeId=node_id,
params=node_params,
inputs={} if is_input_node else input_data,
outputs=outputs,
type="type" if is_input_node else "scanner",
status="pending",
branchId=branch_id,
depth=depth,
)
def explore_branch(
current_node_id: str,
branch_id: str,
branch_name: str,
depth: int,
input_data: Dict[str, Any],
path: List[str],
branch_visited: set,
steps: List[FlowStep],
parent_outputs: Dict[str, Any] = None,
) -> None:
nonlocal branch_counter
# Skip if node is already in current path (cycle detection)
if current_node_id in path:
return
current_node = node_map.get(current_node_id)
if not current_node:
return
# Process node outputs
is_input_node = current_node.data.get("type") == "type"
if is_input_node:
outputs_array = current_node.data["outputs"].get("properties", [])
first_output_name = (
outputs_array[0].get("name", "output") if outputs_array else "output"
)
current_outputs = {first_output_name: initial_value}
else:
# Check if we already have outputs for this scanner
if current_node_id in scanner_outputs:
current_outputs = scanner_outputs[current_node_id]
else:
current_outputs = process_node_data(current_node, input_data)
# Store the outputs for future use
scanner_outputs[current_node_id] = current_outputs
# Extract node parameters
node_params = current_node.data.get("params", {})
# Create and add current step
current_step = create_step(
current_node_id,
branch_id,
depth,
input_data,
is_input_node,
current_outputs,
node_params,
)
steps.append(current_step)
path.append(current_node_id)
branch_visited.add(current_node_id)
# Get all outgoing edges sorted by path length
out_edges = get_outgoing_edges(current_node_id)
if not out_edges:
# Leaf node reached, save the branch
branches.append(FlowBranch(id=branch_id, name=branch_name, steps=steps[:]))
else:
# Process each outgoing edge in order of shortest path
for i, edge in enumerate(out_edges):
if edge.target in path: # Skip if would create cycle
continue
# Prepare next node's input
output_key = edge.sourceHandle
if not output_key and current_outputs:
output_key = list(current_outputs.keys())[0]
output_value = current_outputs.get(output_key) if output_key else None
if output_value is None and parent_outputs:
output_value = (
parent_outputs.get(output_key) if output_key else None
)
next_input = {edge.targetHandle or "input": output_value}
if i == 0:
# Continue in same branch (will be shortest path)
explore_branch(
edge.target,
branch_id,
branch_name,
depth + 1,
next_input,
path,
branch_visited,
steps,
current_outputs,
)
else:
# Create new branch starting from current node
branch_counter += 1
new_branch_id = f"{branch_id}-{branch_counter}"
new_branch_name = f"{branch_name} (Branch {branch_counter})"
new_steps = steps[: len(steps)] # Copy steps up to current node
new_branch_visited = (
branch_visited.copy()
) # Create new visited set for the branch
explore_branch(
edge.target,
new_branch_id,
new_branch_name,
depth + 1,
next_input,
path[:], # Create new path copy for branch
new_branch_visited,
new_steps,
current_outputs,
)
# Backtrack: remove current node from path and remove its step
path.pop()
steps.pop()
# Start exploration from each input node
for index, input_node in enumerate(input_nodes):
branch_id = f"branch-{index}"
branch_name = f"Flow {index + 1}" if len(input_nodes) > 1 else "Main Flow"
explore_branch(
input_node.id,
branch_id,
branch_name,
0,
{},
[], # Use list for path to maintain order
set(), # Use set for visited to check membership
[],
None,
)
# Sort branches by length (number of steps)
branches.sort(key=lambda branch: len(branch.steps))
return branches
def process_node_data(node: Node, inputs: Dict[str, Any]) -> Dict[str, Any]:
"""Traite les données de nœud en fonction du type de nœud et des entrées"""
outputs = {}
output_types = node.data["outputs"].get("properties", [])
for output in output_types:
output_name = output.get("name", "output")
class_name = node.data.get("class_name", "")
# For simulation purposes, we'll return a placeholder value based on the scanner type
if class_name in ["ReverseResolveScanner", "ResolveScanner"]:
# IP/Domain resolution scanners
outputs[output_name] = (
"192.168.1.1" if "ip" in output_name.lower() else "example.com"
)
elif class_name == "SubdomainScanner":
# Subdomain scanner
outputs[output_name] = f"sub.{inputs.get('input', 'example.com')}"
elif class_name == "WhoisScanner":
# WHOIS scanner
outputs[output_name] = {
"domain": inputs.get("input", "example.com"),
"registrar": "Example Registrar",
"creation_date": "2020-01-01",
}
elif class_name == "GeolocationScanner":
# Geolocation scanner
outputs[output_name] = {
"country": "France",
"city": "Paris",
"coordinates": {"lat": 48.8566, "lon": 2.3522},
}
elif class_name == "MaigretScanner":
# Social media scanner
outputs[output_name] = {
"username": inputs.get("input", "user123"),
"platforms": ["twitter", "github", "linkedin"],
}
elif class_name == "HoleheScanner":
# Email verification scanner
outputs[output_name] = {
"email": inputs.get("input", "user@example.com"),
"exists": True,
"platforms": ["gmail", "github"],
}
elif class_name == "SireneScanner":
# Organization scanner
outputs[output_name] = {
"name": inputs.get("input", "Example Corp"),
"siret": "12345678901234",
"address": "1 Example Street",
}
else:
# For unknown scanners, pass through the input
outputs[output_name] = inputs.get("input") or f"transformed_{output_name}"
return outputs

View File

@@ -0,0 +1,29 @@
from .base import ORMBase
from pydantic import UUID4, BaseModel
from typing import Optional
from datetime import datetime
from typing import List, Optional, Dict, Any
class FlowCreate(BaseModel):
name: str
description: Optional[str] = None
category: Optional[List[str]] = None
flow_schema: Optional[Dict[str, Any]] = None
class FlowRead(ORMBase):
id: UUID4
name: str
description: Optional[str]
category: Optional[List[str]]
flow_schema: Optional[Dict[str, Any]]
created_at: datetime
last_updated_at: datetime
class FlowUpdate(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
category: Optional[List[str]] = None
flow_schema: Optional[Dict[str, Any]] = None

View File

@@ -3,6 +3,8 @@ from pydantic import UUID4, BaseModel
from typing import Optional
from datetime import datetime
from .sketch import SketchRead
from .analysis import AnalysisRead
from .profile import ProfileRead
class InvestigationCreate(BaseModel):
@@ -20,7 +22,9 @@ class InvestigationRead(ORMBase):
owner_id: Optional[UUID4]
last_updated_at: datetime
status: str
owner: Optional[ProfileRead] = None
sketches: list[SketchRead] = []
analyses: list[AnalysisRead] = []
class InvestigationProfileCreate(BaseModel):

View File

@@ -1,7 +1,6 @@
from .base import ORMBase
from pydantic import UUID4, BaseModel
from typing import Optional
from datetime import datetime
from typing import List, Optional, Dict, Any
@@ -9,21 +8,19 @@ class TransformCreate(BaseModel):
name: str
description: Optional[str] = None
category: Optional[List[str]] = None
transform_schema: Optional[Dict[str, Any]] = None
class TransformRead(ORMBase):
id: UUID4
name: str
class_name: str
description: Optional[str]
category: Optional[List[str]]
transform_schema: Optional[Dict[str, Any]]
created_at: datetime
last_updated_at: datetime
flow_schema: Optional[Dict[str, Any]]
class TransformUpdate(BaseModel):
name: Optional[str] = None
class_name: str = None
description: Optional[str] = None
category: Optional[List[str]] = None
transform_schema: Optional[Dict[str, Any]] = None

View File

@@ -1,7 +1,6 @@
from fastapi import FastAPI
from flowsint_core.core.graph_db import Neo4jConnection
import os
from typing import List
from dotenv import load_dotenv
from fastapi.middleware.cors import CORSMiddleware
@@ -10,6 +9,7 @@ from app.api.routes import auth
from app.api.routes import investigations
from app.api.routes import sketches
from app.api.routes import transforms
from app.api.routes import flows
from app.api.routes import events
from app.api.routes import analysis
from app.api.routes import chat
@@ -58,6 +58,7 @@ app.include_router(
investigations.router, prefix="/api/investigations", tags=["investigations"]
)
app.include_router(transforms.router, prefix="/api/transforms", tags=["transforms"])
app.include_router(flows.router, prefix="/api/flows", tags=["flows"])
app.include_router(events.router, prefix="/api/events", tags=["events"])
app.include_router(analysis.router, prefix="/api/analyses", tags=["analyses"])
app.include_router(chat.router, prefix="/api/chats", tags=["chats"])

View File

@@ -50,6 +50,7 @@ class Investigation(Base):
sketches = relationship("Sketch", back_populates="investigation")
analyses = relationship("Analysis", back_populates="investigation")
chats = relationship("Chat", back_populates="investigation")
owner = relationship("Profile", foreign_keys=[owner_id])
__table_args__ = (
Index("idx_investigations_id", "id"),
Index("idx_investigations_owner_id", "owner_id"),
@@ -194,8 +195,8 @@ class SketchesProfiles(Base):
)
class Transform(Base):
__tablename__ = "transforms"
class Flow(Base):
__tablename__ = "flows"
id: Mapped[uuid.UUID] = mapped_column(
PGUUID(as_uuid=True), primary_key=True, default=uuid.uuid4
@@ -203,7 +204,7 @@ class Transform(Base):
name = mapped_column(Text, nullable=False)
description = mapped_column(Text, nullable=True)
category = mapped_column(ARRAY(Text), nullable=True)
transform_schema = mapped_column(JSON, nullable=True)
flow_schema = mapped_column(JSON, nullable=True)
created_at = mapped_column(DateTime(timezone=True), server_default=func.now())
last_updated_at = mapped_column(DateTime(timezone=True), server_default=func.now())

View File

@@ -159,7 +159,7 @@ def resolve_type(details: dict, schema_context: dict = None) -> str:
return "any"
def extract_input_schema_transform(model: Type[BaseModel]) -> Dict[str, Any]:
def extract_input_schema_flow(model: Type[BaseModel]) -> Dict[str, Any]:
adapter = TypeAdapter(model)
schema = adapter.json_schema()
@@ -225,7 +225,7 @@ def extract_transform(transform: Dict[str, Any]) -> Dict[str, Any]:
if scanner_node and scanner_node["data"]["type"] == "scanner":
scanners.append(
{
"scanner_name": scanner_node["data"]["name"],
"transform_name": scanner_node["data"]["name"],
"module": scanner_node["data"]["module"],
"input": source_handle,
"output": target_handle,
@@ -238,7 +238,7 @@ def extract_transform(transform: Dict[str, Any]) -> Dict[str, Any]:
"outputs": input_output,
},
"scanners": scanners,
"scanner_names": [scanner["scanner_name"] for scanner in scanners],
"transform_names": [scanner["transform_name"] for scanner in scanners],
}