feat(api): use repositories in services

This commit is contained in:
dextmorgn
2026-02-07 17:23:16 +01:00
parent dd0c53f089
commit 177953b216
2 changed files with 117 additions and 57 deletions

View File

@@ -0,0 +1,107 @@
"""make column types portable (JSONB->JSON, ARRAY->JSON/TEXT)
Revision ID: a1b2c3d4e5f6
Revises: 8173aba964e7
Create Date: 2026-02-07 00:00:00.000000
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = "a1b2c3d4e5f6"
down_revision: Union[str, None] = "8173aba964e7"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# 1. logs.content: JSONB -> JSON
op.execute("ALTER TABLE logs ALTER COLUMN content TYPE JSON USING content::text::json")
# 2. custom_types.schema: JSONB -> JSON
op.execute(
'ALTER TABLE custom_types ALTER COLUMN "schema" TYPE JSON USING "schema"::text::json'
)
# 3. flows.category: ARRAY(Text) -> JSON
op.execute(
"""
ALTER TABLE flows ALTER COLUMN category TYPE JSON
USING CASE
WHEN category IS NULL THEN NULL
ELSE array_to_json(category)
END
"""
)
# 4. investigation_user_roles.roles: ARRAY(role_enum) -> TEXT (JSON string)
# Convert PostgreSQL enum array like {OWNER,EDITOR} to JSON string like '["owner","editor"]'
op.execute(
"""
ALTER TABLE investigation_user_roles ALTER COLUMN roles TYPE TEXT
USING CASE
WHEN roles IS NULL THEN '[]'
ELSE lower(array_to_json(roles::text[])::text)
END
"""
)
# Remove the server_default that used PostgreSQL array literal '{}'
op.alter_column("investigation_user_roles", "roles", server_default=None)
# Drop the role_enum type (no longer needed)
op.execute("DROP TYPE IF EXISTS role_enum")
def downgrade() -> None:
# Recreate the role_enum type
op.execute("CREATE TYPE role_enum AS ENUM ('OWNER', 'EDITOR', 'VIEWER')")
# 4. investigation_user_roles.roles: TEXT -> ARRAY(role_enum)
# Use a temp column to avoid subquery restriction in USING
op.execute("ALTER TABLE investigation_user_roles ADD COLUMN roles_tmp role_enum[]")
op.execute(
"""
UPDATE investigation_user_roles SET roles_tmp = CASE
WHEN roles IS NULL OR roles = '[]' THEN '{}'::role_enum[]
ELSE (
SELECT array_agg(upper(elem)::role_enum)
FROM json_array_elements_text(roles::json) AS elem
)
END
"""
)
op.execute("ALTER TABLE investigation_user_roles DROP COLUMN roles")
op.execute("ALTER TABLE investigation_user_roles RENAME COLUMN roles_tmp TO roles")
op.alter_column(
"investigation_user_roles", "roles", server_default=sa.text("'{}'")
)
# 3. flows.category: JSON -> ARRAY(Text)
# Use a temp column to avoid subquery restriction in USING
op.execute("ALTER TABLE flows ADD COLUMN category_tmp TEXT[]")
op.execute(
"""
UPDATE flows SET category_tmp = CASE
WHEN category IS NULL THEN NULL
ELSE (
SELECT array_agg(elem::text)
FROM json_array_elements_text(category::json) AS elem
)
END
"""
)
op.execute("ALTER TABLE flows DROP COLUMN category")
op.execute("ALTER TABLE flows RENAME COLUMN category_tmp TO category")
# 2. custom_types.schema: JSON -> JSONB
op.execute(
'ALTER TABLE custom_types ALTER COLUMN "schema" TYPE JSONB USING "schema"::jsonb'
)
# 1. logs.content: JSON -> JSONB
op.execute("ALTER TABLE logs ALTER COLUMN content TYPE JSONB USING content::jsonb")

View File

@@ -1,17 +1,12 @@
from fastapi import APIRouter, HTTPException, Depends, status
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import os
from mistralai import Mistral
from mistralai.models import UserMessage, AssistantMessage, SystemMessage
import json
from uuid import UUID, uuid4
from uuid import UUID
from typing import Dict, List, Optional
from datetime import datetime
from sqlalchemy.orm import Session
from flowsint_core.core.postgre_db import get_db
from flowsint_core.core.models import ChatMessage, Profile
from flowsint_core.core.models import Profile
from flowsint_core.core.services import (
create_chat_service,
NotFoundError,
@@ -59,63 +54,21 @@ async def stream_chat(
except NotFoundError:
raise HTTPException(status_code=404, detail="Chat not found")
# Add user message
service.add_user_message(chat_id, current_user.id, payload.prompt, payload.context)
# Prepare AI context
ai_context = service.prepare_ai_context(chat, payload.prompt, payload.context)
llm_messages = service.build_llm_messages(ai_context)
try:
api_key = os.environ.get("MISTRAL_API_KEY")
if not api_key:
raise HTTPException(
status_code=500, detail="Mistral API key not configured"
)
client = Mistral(api_key=api_key)
model = "mistral-small-latest"
accumulated_content = []
# Build messages for Mistral
messages = [
SystemMessage(
content="You are a CTI/OSINT investigator and you are trying to investigate on a variety of real life cases. Use your knowledge and analytics capabilities to analyse the context and answer the question the best you can. If you need to reference some items (an IP, a domain or something particular) please use the code brackets, like : `12.23.34.54` to reference it."
)
]
for message in ai_context["recent_messages"]:
if message.is_bot:
messages.append(
AssistantMessage(content=json.dumps(message.content, default=str))
)
else:
messages.append(
UserMessage(content=json.dumps(message.content, default=str))
)
if ai_context["context_message"]:
messages.append(SystemMessage(content=ai_context["context_message"]))
messages.append(UserMessage(content=ai_context["user_prompt"]))
async def generate():
response = await client.chat.stream_async(model=model, messages=messages)
async for chunk in response:
if chunk.data.choices[0].delta.content is not None:
content_chunk = chunk.data.choices[0].delta.content
accumulated_content.append(content_chunk)
yield f"data: {json.dumps({'content': content_chunk})}\n\n"
# Save bot message after streaming completes
service.add_bot_message(chat_id, "".join(accumulated_content))
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
except Exception as e:
provider = service.get_llm_provider(current_user.id)
except ValueError as e:
raise HTTPException(status_code=500, detail=str(e))
return StreamingResponse(
service.stream_response(chat_id, llm_messages, provider),
media_type="text/event-stream",
)
@router.post("/create", response_model=ChatRead, status_code=status.HTTP_201_CREATED)
def create_chat(