[GH-ISSUE #133] [DRAFT] RFC: Graph Conductor - Concurrency, Locking, and Coordination for Multi-Agent Graph Mutations in Flowsint v2 #501

Open
opened 2026-04-19 13:07:27 -05:00 by GiteaMirror · 4 comments
Owner

Originally created by @gustavorps on GitHub (Mar 26, 2026).
Original GitHub issue: https://github.com/reconurge/flowsint/issues/133

RFC: Graph Conductor - Concurrency, Locking, and Coordination for Multi-Agent Graph Mutations in Flowsint v2

RFC:        graph-conductor
Title:      Graph Conductor — Multi-Agent Coordination Layer for the Knowledge Graph
Author:     Gustavo R. P. de Souza <@gustavorps>
Status:     Draft
Created:    2026-03-26
Updated:    2026-03-26
Related:    reconurge/flowsint#127 (Agnostic Revert Feature)
            reconurge/flowsint#109 (Undo enrichment)
            reconurge/flowsint#130 (Plugin Discovery and Namespace Code Structure)
Superpowers: compatible (docs/plans/2026-03-26-graph-conductor-design.md)

Summary

Flowsint's knowledge graph currently allows Celery workers to write nodes and edges to Neo4j without coordination, locking, or rollback capability. As the platform evolves toward multi-agent enrichment — where multiple Claude agents simultaneously populate, enrich, expand, and delete graph regions — this lack of coordination creates race conditions, phantom reads, conflicting writes, and irrecoverable state corruption.

Graph Conductor is a new coordination layer inside flowsint-core that sits between agents/enrichers and Neo4j. It provides a command queue, graph region locking, optimistic concurrency control via version vectors, event sourcing for full reversibility, a claim-before-work protocol, role-scoped agent permissions, idempotent command execution, and a dead letter queue for failed mutations. Together, these mechanisms make the knowledge graph safe for concurrent multi-agent operation while enabling the "Agnostic Revert" capability requested in issue #127.


1. Motivation and problem statement

1.1 Current architecture gaps

Flowsint's existing mutation path is straightforward but unsafe under concurrency:

Agent/Enricher → Celery.delay() → Redis Queue → Worker → Transform.postprocess() → Neo4j WRITE

Three specific problems arise when multiple agents operate simultaneously:

Lost updates. Agent A reads a Domain node, enriches it with WHOIS data, and writes back. Agent B reads the same node before A's write, enriches it with DNS data, and writes back — overwriting A's WHOIS additions. Neo4j has no application-level optimistic concurrency control; the last writer wins silently.

Phantom graph regions. Agent A discovers 50 subdomains and begins creating nodes. Agent B starts a cleanup task that deletes "stale" nodes in the same workspace. B deletes nodes A is still linking to, leaving orphaned edges pointing at nonexistent targets.

Irreversible mutations. Issue #109 requests undo-enrichment capability. Issue #127 proposes an "Agnostic Revert Feature" — a mechanism to reverse any graph operation regardless of type. Today, once postprocess() writes to Neo4j, there is no record of what changed and no way to reverse it. The three-phase pipeline (preprocess → scan → postprocess) lacks a fourth phase: record what happened so it can be undone.

1.2 Why this matters for multi-agent OSINT

The move toward AI-powered investigation means multiple Claude agents will operate on the same investigation graph concurrently:

  • An Enrichment Agent runs domain-to-subdomains while an Attribution Agent runs WHOIS lookups on the same domain
  • A Expansion Agent discovers new IP ranges while a Validation Agent prunes false-positive nodes
  • A Triage Agent tags high-priority nodes while a Reporting Agent reads the graph for export

Without coordination, these agents corrupt each other's work. Graph Conductor solves this.

1.3 Relationship to existing issues

Issue Title How Graph Conductor addresses it
#127 [WiP] RFC: Agnostic Revert Feature Event sourcing provides operation-type-agnostic undo for any graph mutation
#109 [Feature Request] Undo enrichment Command journaling enables per-enrichment rollback via compensating commands
#130 [PROPOSAL] Plugin Discovery and Namespace Role-scoped permissions integrate with plugin namespace boundaries
#106 [Feature Request] Add timestamps of events Event sourcing naturally timestamps every mutation
#94 Configurable API-Based Enricher Type Idempotent commands make retry-safe enrichers possible for unreliable external APIs

2. Architecture overview

Graph Conductor introduces a new module, flowsint-core/src/flowsint/core/conductor/, that intercepts all graph mutations before they reach Neo4j. It does not replace the existing Transform pipeline; it wraps postprocess() with coordination guarantees.

2.1 System context

graph TD
    subgraph Agents["Claude Agent Team"]
        A1["🔍 Enrichment Agent<br/><i>role: enrichment</i>"]
        A2["🌐 Expansion Agent<br/><i>role: expansion</i>"]
        A3["✅ Validation Agent<br/><i>role: validation</i>"]
        A4["🧹 Cleanup Agent<br/><i>role: cleanup</i>"]
        A5["🏷️ Triage Agent<br/><i>role: triage</i>"]
    end

    subgraph Conductor["Graph Conductor — flowsint-core/conductor"]
        direction TB

        ACL["Anti-Corruption Layer<br/><i>Validate · Normalize · Sanitize</i>"]

        subgraph Queue["Command Queue — Redis Sorted Sets"]
            Q1["Workspace A Queue<br/><i>priority-ordered</i>"]
            Q2["Workspace B Queue<br/><i>priority-ordered</i>"]
            Q3["Workspace N Queue<br/><i>priority-ordered</i>"]
        end

        IDEM["Idempotency Registry<br/><i>Redis SET NX · 24h TTL</i>"]
        PERM["Permission Check<br/><i>Role → CommandType matrix</i>"]

        subgraph Locking["Graph Region Locking — Redis"]
            RL["Node-Level Locks<br/><i>SET NX EX · sorted acquisition</i>"]
            SL["Workspace-Level Locks<br/><i>bulk operations only</i>"]
        end

        subgraph Executor["Conductor Executor"]
            VV["Version Vector Check<br/><i>WHERE n._version = $expected</i>"]
            SNAP_B["Snapshot before_state"]
            EXEC["Execute Cypher Mutation"]
            SNAP_A["Snapshot after_state"]
            VV --> SNAP_B --> EXEC --> SNAP_A
        end
    end

    subgraph Storage["Persistence Layer"]
        NEO["Neo4j 5 + APOC<br/><i>Knowledge Graph</i>"]
        PG["PostgreSQL<br/><i>Event Store · DLQ</i>"]
    end

    subgraph Outputs["Downstream"]
        WS["WebSocket Stream<br/><i>Real-time frontend events</i>"]
        DLQ["Dead Letter Queue<br/><i>Failed commands</i>"]
        PUBSUB["Redis Pub/Sub<br/><i>conductor:events:{workspace_id}</i>"]
    end

    A1 & A2 & A3 & A4 & A5 -->|"POST /api/.../conductor/commands"| ACL
    ACL -->|"validated commands"| Queue
    Queue --> IDEM
    IDEM -->|"new command"| PERM
    IDEM -.->|"duplicate → cached result"| Agents
    PERM -->|"authorized"| Locking
    PERM -.->|"denied"| DLQ
    Locking -->|"lock acquired"| Executor
    Locking -.->|"lock timeout after retries"| DLQ
    EXEC -->|"Cypher + version bump"| NEO
    SNAP_A -->|"record event"| PG
    SNAP_A --> PUBSUB
    PUBSUB --> WS
    DLQ --> PG

    style Agents fill:transparent,stroke:#7F77DD,stroke-width:1.5px
    style Conductor fill:transparent,stroke:#D85A30,stroke-width:1.5px
    style Queue fill:transparent,stroke:#1D9E75,stroke-width:1px
    style Locking fill:transparent,stroke:#BA7517,stroke-width:1px
    style Executor fill:transparent,stroke:#185FA5,stroke-width:1px
    style Storage fill:transparent,stroke:#5F5E5A,stroke-width:1.5px
    style Outputs fill:transparent,stroke:#993556,stroke-width:1.5px

    style A1 fill:#EEEDFE,stroke:#534AB7,color:#3C3489
    style A2 fill:#EEEDFE,stroke:#534AB7,color:#3C3489
    style A3 fill:#EEEDFE,stroke:#534AB7,color:#3C3489
    style A4 fill:#EEEDFE,stroke:#534AB7,color:#3C3489
    style A5 fill:#EEEDFE,stroke:#534AB7,color:#3C3489

    style ACL fill:#FAECE7,stroke:#993C1D,color:#712B13
    style Q1 fill:#E1F5EE,stroke:#0F6E56,color:#085041
    style Q2 fill:#E1F5EE,stroke:#0F6E56,color:#085041
    style Q3 fill:#E1F5EE,stroke:#0F6E56,color:#085041
    style IDEM fill:#E1F5EE,stroke:#0F6E56,color:#085041
    style PERM fill:#FAECE7,stroke:#993C1D,color:#712B13
    style RL fill:#FAEEDA,stroke:#854F0B,color:#633806
    style SL fill:#FAEEDA,stroke:#854F0B,color:#633806
    style VV fill:#E6F1FB,stroke:#185FA5,color:#0C447C
    style SNAP_B fill:#E6F1FB,stroke:#185FA5,color:#0C447C
    style EXEC fill:#E6F1FB,stroke:#185FA5,color:#0C447C
    style SNAP_A fill:#E6F1FB,stroke:#185FA5,color:#0C447C

    style NEO fill:#EAF3DE,stroke:#3B6D11,color:#27500A
    style PG fill:#F1EFE8,stroke:#5F5E5A,color:#444441
    style WS fill:#FBEAF0,stroke:#993556,color:#72243E
    style DLQ fill:#FCEBEB,stroke:#A32D2D,color:#791F1F
    style PUBSUB fill:#FBEAF0,stroke:#993556,color:#72243E

2.2 Module placement

Graph Conductor lives inside flowsint-core and depends only on flowsint-types. It does not depend on flowsint-enrichers or flowsint-api, preserving the existing dependency hierarchy:

flowsint-types          (unchanged — Pydantic entity models)
    ↑
flowsint-core/conductor (NEW — this RFC)
    ↑
flowsint-core           (existing — orchestrator, tasks, vault, DB connectors)
    ↑
flowsint-enrichers      (unchanged — enricher implementations)
    ↑
flowsint-api            (unchanged — FastAPI routes)

2.3 New directory structure

flowsint-core/src/flowsint/core/conductor/
├── __init__.py
├── commands.py           # GraphCommand base class + concrete command types
├── command_queue.py      # Redis-backed FIFO command queue
├── region_lock.py        # Graph region locking with Redis distributed locks
├── version_vector.py     # Per-node version vector implementation
├── executor.py           # ConductorExecutor — the core orchestration loop
├── event_store.py        # PostgreSQL-backed event journal
├── event_types.py        # Event schema definitions (Pydantic)
├── permissions.py        # Role-scoped agent permission checks
├── dead_letter.py        # Dead letter queue for failed commands
├── idempotency.py        # Idempotency key registry
└── anti_corruption.py    # ACL boundary between agents and Neo4j

3. Command queue pattern

3.1 Design

Every graph mutation is expressed as a GraphCommand — a serializable, immutable data object that describes what should happen without executing it. Commands enter a Redis-backed FIFO queue partitioned by workspace ID, ensuring that mutations within a single investigation are ordered while mutations across investigations execute in parallel.

3.2 Command types

from enum import Enum
from pydantic import BaseModel, Field
from datetime import datetime
from typing import Optional
import uuid


class CommandType(str, Enum):
    CREATE_NODE = "create_node"
    UPDATE_NODE = "update_node"
    DELETE_NODE = "delete_node"
    CREATE_EDGE = "create_edge"
    UPDATE_EDGE = "update_edge"
    DELETE_EDGE = "delete_edge"
    BATCH_MUTATION = "batch_mutation"


class GraphCommand(BaseModel):
    """Immutable graph mutation command. Created by agents, executed by Conductor."""

    command_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
    command_type: CommandType
    workspace_id: str
    owner_id: str
    agent_id: str
    agent_role: str                          # e.g., "enrichment", "validation", "expansion"
    idempotency_key: str                     # Client-generated; prevents duplicate execution
    timestamp: datetime = Field(default_factory=datetime.utcnow)
    target_node_ids: list[str] = []          # Nodes this command reads or writes
    target_edge_ids: list[str] = []          # Edges this command reads or writes
    expected_versions: dict[str, int] = {}   # node_id → expected version for OCC
    payload: dict                            # Command-specific data (entity type, properties, etc.)
    priority: int = 0                        # Higher = processed first within queue partition
    ttl_seconds: int = 300                   # Command expires if not executed within TTL
    causation_id: Optional[str] = None       # ID of the command that caused this one
    correlation_id: Optional[str] = None     # Groups related commands (e.g., one enrichment run)

3.3 Queue mechanics

Commands are enqueued to Redis sorted sets keyed by conductor:queue:{workspace_id}, scored by (priority, timestamp). The Conductor Executor polls queues using BZPOPMIN for blocking dequeue with automatic priority ordering.

class CommandQueue:
    """Redis-backed priority command queue, partitioned by workspace."""

    QUEUE_PREFIX = "conductor:queue:"

    async def enqueue(self, command: GraphCommand) -> None:
        key = f"{self.QUEUE_PREFIX}{command.workspace_id}"
        score = (-command.priority * 1e12) + command.timestamp.timestamp()
        await self.redis.zadd(key, {command.model_dump_json(): score})

    async def dequeue(self, workspace_id: str, timeout: float = 5.0) -> GraphCommand | None:
        key = f"{self.QUEUE_PREFIX}{workspace_id}"
        result = await self.redis.bzpopmin(key, timeout=timeout)
        if result:
            _, payload, _ = result
            return GraphCommand.model_validate_json(payload)
        return None

3.4 Integration with existing Transform pipeline

The current Transform.postprocess() method writes directly to Neo4j. Graph Conductor wraps this by introducing ConductorAwareTransform:

class ConductorAwareTransform(Transform):
    """Transform subclass that routes mutations through Graph Conductor."""

    async def postprocess(self, scan_results: dict) -> list[GraphCommand]:
        """Instead of writing to Neo4j directly, emit GraphCommands."""
        commands = self.build_commands(scan_results)
        for cmd in commands:
            await self.conductor.enqueue(cmd)
        return commands

    def build_commands(self, scan_results: dict) -> list[GraphCommand]:
        """Subclasses override to produce commands from scan output."""
        raise NotImplementedError

Existing enrichers continue to work unchanged — the TransformOrchestrator detects whether the enricher extends ConductorAwareTransform or the legacy Transform and routes accordingly. This provides a non-breaking migration path.


4. Graph region locking

4.1 Region definition

A graph region is a set of node IDs and edge IDs that a command intends to read or write. Regions are declared upfront in target_node_ids and target_edge_ids on each GraphCommand. This is the claim-before-work pattern: agents must declare their intent before execution begins.

4.2 Lock granularity

The locking system operates at two granularities:

Granularity Lock key pattern Use case
Node-level lock:node:{node_id} Single-node enrichments, property updates
Workspace-level lock:workspace:{workspace_id} Bulk operations, graph-wide cleanup/export

Node-level locks use Redis SET NX EX (set-if-not-exists with expiry) for distributed mutual exclusion. Workspace-level locks are acquired as a Redis SETNX on the workspace key and block all node-level locks within that workspace.

4.3 Lock acquisition protocol

class RegionLock:
    """Distributed graph region lock using Redis."""

    LOCK_TTL = 30  # seconds; auto-expires to prevent deadlocks
    RETRY_DELAY = 0.1
    MAX_RETRIES = 50

    async def acquire(self, command: GraphCommand) -> bool:
        """Acquire locks for all nodes/edges in the command's region.
        Uses sorted-order acquisition to prevent deadlocks."""
        targets = sorted(command.target_node_ids + command.target_edge_ids)
        acquired = []

        try:
            for target_id in targets:
                key = f"lock:node:{target_id}"
                for attempt in range(self.MAX_RETRIES):
                    if await self.redis.set(key, command.command_id, nx=True, ex=self.LOCK_TTL):
                        acquired.append(key)
                        break
                    await asyncio.sleep(self.RETRY_DELAY * (attempt + 1))
                else:
                    # Failed to acquire — release all and retry entire command
                    await self._release_all(acquired, command.command_id)
                    return False
            return True
        except Exception:
            await self._release_all(acquired, command.command_id)
            raise

    async def release(self, command: GraphCommand) -> None:
        """Release all locks held by this command."""
        targets = sorted(command.target_node_ids + command.target_edge_ids)
        lock_keys = [f"lock:node:{tid}" for tid in targets]
        await self._release_all(lock_keys, command.command_id)

    async def _release_all(self, keys: list[str], owner: str) -> None:
        """Release locks only if still owned by this command (compare-and-delete)."""
        for key in keys:
            # Lua script for atomic compare-and-delete
            await self.redis.eval(
                "if redis.call('get', KEYS[1]) == ARGV[1] then "
                "return redis.call('del', KEYS[1]) else return 0 end",
                1, key, owner
            )

Deadlock prevention: Locks are always acquired in sorted order of target IDs. Combined with TTL-based auto-expiry, this eliminates deadlock scenarios entirely.

4.4 Claim-before-work pattern

Agents must declare the graph region they intend to mutate before performing any external OSINT calls. This happens during the enricher's preprocess() phase:

1. Agent receives task (e.g., "enrich domain example.com with subdomains")
2. preprocess() identifies target: node_id="domain-abc123"
3. Agent issues a CLAIM command declaring intent to write to region {domain-abc123}
4. Conductor validates the claim against the agent's role permissions
5. Conductor acquires region lock
6. Agent proceeds with scan() — external API calls
7. Agent issues mutation commands within the claimed region
8. Conductor executes commands and releases lock

If a claim cannot be granted (region already locked), the agent receives a REGION_BUSY response with the lock holder's agent ID and estimated completion time. The agent can then choose to wait, skip, or escalate.


5. Optimistic concurrency control with version vectors

5.1 Version vector design

Every node and edge in the Neo4j graph carries a _version integer property. When an agent reads a node, it captures the current version. When it submits a mutation command, it includes expected_versions mapping each target to the version it read. The Conductor checks these versions at execution time.

class VersionVector:
    """Manages per-entity version tracking in Neo4j."""

    async def check_and_increment(
        self, neo4j_session, node_id: str, expected_version: int
    ) -> bool:
        """Atomically check version and increment. Returns False on conflict."""
        result = await neo4j_session.run(
            """
            MATCH (n {id: $node_id})
            WHERE n._version = $expected_version
            SET n._version = $expected_version + 1
            RETURN n._version AS new_version
            """,
            node_id=node_id,
            expected_version=expected_version,
        )
        record = await result.single()
        return record is not None

5.2 Conflict resolution strategy

When a version mismatch occurs:

  1. Default: Reject and retry. The command is returned to the queue with an incremented retry counter. The agent receives a VERSION_CONFLICT event containing the current node state, allowing it to rebase its changes.
  2. Merge mode (opt-in). For additive-only operations (e.g., appending new properties without overwriting existing ones), the Conductor can auto-merge by applying a last-writer-wins per-property strategy. The agent opts in by setting payload.merge_strategy = "additive".
  3. Force mode (privileged). Agents with the admin role can force-write, bypassing version checks. This is logged as a FORCE_WRITE event for audit.

5.3 Neo4j migration

A migration in neo4j-migrations/ adds the _version property to all existing nodes:

// Migration: 001_add_version_vectors.cypher
MATCH (n)
WHERE n._version IS NULL
SET n._version = 0
RETURN count(n) AS nodes_versioned;

// Create index for version lookups
CREATE INDEX node_version_idx IF NOT EXISTS FOR (n:Entity) ON (n._version);

6. Event sourcing

6.1 Event store design

Every command execution produces an event — an immutable record of what changed. Events are stored in PostgreSQL (alongside existing user/chat data) in an append-only graph_events table. This provides the foundation for the "Agnostic Revert" capability requested in issue #127.

CREATE TABLE graph_events (
    event_id        UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    command_id      UUID NOT NULL,
    command_type    VARCHAR(50) NOT NULL,
    workspace_id       UUID NOT NULL,
    owner_id        UUID NOT NULL,
    agent_id        VARCHAR(255) NOT NULL,
    agent_role      VARCHAR(50) NOT NULL,
    correlation_id  UUID,                          -- Groups events from one enrichment run
    causation_id    UUID,                          -- The command that triggered this event
    timestamp       TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    target_node_ids TEXT[] NOT NULL DEFAULT '{}',
    target_edge_ids TEXT[] NOT NULL DEFAULT '{}',
    before_state    JSONB,                         -- Snapshot of affected entities BEFORE mutation
    after_state     JSONB,                         -- Snapshot of affected entities AFTER mutation
    delta           JSONB NOT NULL,                -- Minimal diff of what changed
    is_reverted     BOOLEAN NOT NULL DEFAULT FALSE,
    reverted_by     UUID,                          -- Points to the compensating event
    metadata        JSONB DEFAULT '{}'
);

CREATE INDEX idx_events_workspace ON graph_events (workspace_id, timestamp DESC);
CREATE INDEX idx_events_correlation ON graph_events (correlation_id);
CREATE INDEX idx_events_node ON graph_events USING GIN (target_node_ids);

6.2 Event lifecycle

Command submitted → Conductor acquires lock → Version check passes
  → Snapshot before_state from Neo4j
  → Execute Cypher mutation
  → Snapshot after_state from Neo4j
  → Compute delta (before_state → after_state)
  → Write event to graph_events table
  → Release lock
  → Emit event to Redis Pub/Sub for real-time frontend updates

6.3 Agnostic revert (addressing issue #127)

Reverting any operation is operation-type-agnostic because every event contains before_state. To revert:

class EventStore:
    async def revert_event(self, event_id: str) -> GraphCommand:
        """Generate a compensating command that undoes an event."""
        event = await self.get_event(event_id)

        if event.is_reverted:
            raise AlreadyRevertedError(event_id)

        compensating_command = GraphCommand(
            command_type=self._inverse_type(event.command_type),
            workspace_id=event.workspace_id,
            owner_id=event.owner_id,
            agent_id="system:revert",
            agent_role="admin",
            idempotency_key=f"revert:{event.event_id}",
            payload=event.before_state,
            causation_id=event.command_id,
            correlation_id=event.correlation_id,
        )
        return compensating_command

    async def revert_correlation(self, correlation_id: str) -> list[GraphCommand]:
        """Revert ALL events from a single enrichment run, in reverse order."""
        events = await self.get_events_by_correlation(
            correlation_id, order="timestamp DESC"
        )
        return [await self.revert_event(e.event_id) for e in events]

This directly solves issue #127 ("Agnostic Revert Feature") because:

  • The revert mechanism works identically for node creates, updates, deletes, edge creates, and batch mutations
  • revert_correlation() undoes an entire enrichment run (solving issue #109 "Undo enrichment")
  • The before_state/after_state snapshots are self-contained — no enricher-specific logic needed
  • Reverts are themselves events, creating a full audit trail

6.4 Time-travel queries

The event store enables investigation timeline reconstruction:

async def graph_state_at(self, workspace_id: str, timestamp: datetime) -> dict:
    """Reconstruct graph state at any point in time by replaying events."""
    events = await self.get_events(workspace_id, before=timestamp, order="timestamp ASC")
    state = {}
    for event in events:
        if not event.is_reverted:
            self._apply_delta(state, event.delta)
    return state

7. Idempotent commands

7.1 Problem

External OSINT APIs are unreliable. Network timeouts, Celery worker restarts, and agent retries can cause the same enrichment to be submitted multiple times. Without idempotency, a domain_to_subdomains enrichment could create duplicate nodes.

7.2 Idempotency key registry

Every GraphCommand carries an idempotency_key generated by the agent. The Conductor checks this key against a Redis-backed registry before execution:

class IdempotencyRegistry:
    """Prevents duplicate command execution using client-generated keys."""

    KEY_PREFIX = "idempotency:"
    KEY_TTL = 86400  # 24 hours

    async def check_and_register(self, key: str, command_id: str) -> bool:
        """Returns True if this key is new (safe to execute). False if duplicate."""
        registered = await self.redis.set(
            f"{self.KEY_PREFIX}{key}",
            command_id,
            nx=True,
            ex=self.KEY_TTL
        )
        return registered is not None  # True = new key, False = duplicate

    async def get_result(self, key: str) -> dict | None:
        """Retrieve the cached result of a previously executed command."""
        result = await self.redis.get(f"{self.KEY_PREFIX}{key}:result")
        return json.loads(result) if result else None

7.3 Key generation convention

Agents generate idempotency keys using a deterministic hash of their intent:

def make_idempotency_key(agent_id: str, enricher_name: str, entity_id: str, params_hash: str) -> str:
    """Deterministic key: same agent + same enricher + same entity + same params = same key."""
    raw = f"{agent_id}:{enricher_name}:{entity_id}:{params_hash}"
    return hashlib.sha256(raw.encode()).hexdigest()

When a duplicate is detected, the Conductor returns the cached result of the original execution without re-running the command.


8. Role-scoped agent permissions

8.1 Agent roles

Each Claude agent operates under a declared role that constrains what graph mutations it can perform. Roles are defined in a permission matrix:

Role CREATE nodes UPDATE nodes DELETE nodes CREATE edges DELETE edges Bulk ops Force write
enrichment
validation
expansion
cleanup
triage
admin
readonly

8.2 Permission enforcement

class AgentPermissions:
    """Enforces role-based command authorization."""

    PERMISSION_MATRIX: dict[str, set[CommandType]] = {
        "enrichment": {CommandType.CREATE_NODE, CommandType.UPDATE_NODE, CommandType.CREATE_EDGE},
        "validation": {CommandType.UPDATE_NODE},
        "expansion": {CommandType.CREATE_NODE, CommandType.CREATE_EDGE},
        "cleanup": {CommandType.DELETE_NODE, CommandType.DELETE_EDGE, CommandType.BATCH_MUTATION},
        "triage": {CommandType.UPDATE_NODE},
        "admin": set(CommandType),  # All permissions
        "readonly": set(),  # No mutations allowed
    }

    def authorize(self, command: GraphCommand) -> bool:
        allowed = self.PERMISSION_MATRIX.get(command.agent_role, set())
        if command.command_type not in allowed:
            raise PermissionDeniedError(
                f"Agent role '{command.agent_role}' cannot execute '{command.command_type}'. "
                f"Allowed: {allowed}"
            )
        return True

8.3 Entity-type restrictions (optional extension)

Roles can be further restricted by entity type. For example, a crypto_enrichment agent might only be allowed to create CryptoWallet and CryptoTransaction nodes. This maps naturally to flowsint-types' Pydantic model hierarchy:

ENTITY_TYPE_RESTRICTIONS: dict[str, set[str]] = {
    "crypto_enrichment": {"CryptoWallet", "CryptoNFT", "CryptoTransaction"},
    "domain_enrichment": {"Domain", "IP", "ASN", "CIDR", "Website"},
    "social_enrichment": {"Individual", "SocialProfile", "Email", "Phone"},
}

9. Dead letter queue

9.1 Purpose

Commands that fail after all retries — due to persistent version conflicts, permission errors, Neo4j connectivity issues, or malformed payloads — are moved to a dead letter queue (DLQ) instead of being silently dropped.

9.2 Implementation

class DeadLetterQueue:
    """Stores permanently failed commands for inspection and manual retry."""

    DLQ_KEY = "conductor:dlq:{workspace_id}"

    async def enqueue(self, command: GraphCommand, failure_reason: str, attempts: int) -> None:
        entry = {
            "command": command.model_dump(),
            "failure_reason": failure_reason,
            "attempts": attempts,
            "failed_at": datetime.utcnow().isoformat(),
        }
        await self.redis.lpush(
            f"conductor:dlq:{command.workspace_id}",
            json.dumps(entry)
        )
        # Also write to PostgreSQL for durable storage
        await self.pg_pool.execute(
            """INSERT INTO dead_letter_commands
               (command_id, workspace_id, owner_id, command_data, failure_reason, attempts, failed_at)
               VALUES ($1, $2, $3, $4, $5, $6, NOW())""",
            command.command_id, command.workspace_id, command.owner_id,
            json.dumps(command.model_dump()), failure_reason, attempts,
        )

    async def inspect(self, workspace_id: str, limit: int = 50) -> list[dict]:
        """List failed commands for a workspace — exposed via API for debugging."""
        return await self.pg_pool.fetch(
            "SELECT * FROM dead_letter_commands WHERE workspace_id = $1 ORDER BY failed_at DESC LIMIT $2",
            workspace_id, limit,
        )

    async def retry(self, command_id: str) -> None:
        """Re-enqueue a dead-lettered command for another attempt."""
        record = await self.pg_pool.fetchrow(
            "SELECT command_data FROM dead_letter_commands WHERE command_id = $1", command_id
        )
        command = GraphCommand.model_validate_json(record["command_data"])
        command.idempotency_key = f"retry:{command.command_id}:{uuid.uuid4()}"
        await self.command_queue.enqueue(command)

9.3 DLQ API endpoints

New endpoints in flowsint-api for DLQ management:

GET  /api/workspace/{workspace_id}/conductor/dlq          — List dead-lettered commands
POST /api/workspace/{workspace_id}/conductor/dlq/{id}/retry — Retry a specific command
DELETE /api/workspace/{workspace_id}/conductor/dlq/{id}     — Dismiss a dead-lettered command

10. Conductor executor — the orchestration loop

10.1 Execution flow

The Conductor Executor is the central loop that ties all components together. It runs as a Celery task (or standalone asyncio worker) alongside the existing Celery workers:

class ConductorExecutor:
    """Core orchestration loop for Graph Conductor."""

    MAX_RETRIES = 3
    RETRY_BACKOFF = [1.0, 2.0, 5.0]

    async def run(self, workspace_id: str):
        """Main execution loop for a single workspace partition."""
        while True:
            command = await self.command_queue.dequeue(workspace_id)
            if not command:
                continue

            try:
                await self.execute(command)
            except Exception as e:
                logger.error(f"Conductor error: {e}", extra={"command_id": command.command_id})

    async def execute(self, command: GraphCommand, attempt: int = 0) -> None:
        """Execute a single command with full coordination guarantees."""

        # 1. Idempotency check
        is_new = await self.idempotency.check_and_register(
            command.idempotency_key, command.command_id
        )
        if not is_new:
            cached = await self.idempotency.get_result(command.idempotency_key)
            await self.notify_agent(command, "DUPLICATE", cached)
            return

        # 2. Permission check
        try:
            self.permissions.authorize(command)
        except PermissionDeniedError as e:
            await self.dead_letter.enqueue(command, str(e), attempt)
            await self.notify_agent(command, "PERMISSION_DENIED", str(e))
            return

        # 3. TTL check
        if command.is_expired():
            await self.dead_letter.enqueue(command, "EXPIRED", attempt)
            return

        # 4. Acquire region lock
        locked = await self.region_lock.acquire(command)
        if not locked:
            if attempt < self.MAX_RETRIES:
                await asyncio.sleep(self.RETRY_BACKOFF[attempt])
                await self.execute(command, attempt + 1)
            else:
                await self.dead_letter.enqueue(command, "LOCK_TIMEOUT", attempt)
            return

        try:
            # 5. Version vector check
            for node_id, expected in command.expected_versions.items():
                current = await self.version_vector.get_version(node_id)
                if current != expected:
                    if attempt < self.MAX_RETRIES:
                        await self.region_lock.release(command)
                        await self.notify_agent(command, "VERSION_CONFLICT", {
                            "node_id": node_id, "expected": expected, "actual": current
                        })
                        return  # Agent must rebase and resubmit
                    else:
                        await self.dead_letter.enqueue(command, "VERSION_CONFLICT", attempt)
                        return

            # 6. Snapshot before_state
            before_state = await self.snapshot_targets(command)

            # 7. Execute Cypher mutation in Neo4j transaction
            async with self.neo4j.session() as session:
                async with session.begin_transaction() as tx:
                    await self.apply_mutation(tx, command)
                    # Increment versions for all written nodes
                    for node_id in command.target_node_ids:
                        await self.version_vector.increment(tx, node_id)
                    await tx.commit()

            # 8. Snapshot after_state
            after_state = await self.snapshot_targets(command)

            # 9. Write event to event store
            event = await self.event_store.record(
                command=command,
                before_state=before_state,
                after_state=after_state,
            )

            # 10. Cache result for idempotency
            await self.idempotency.store_result(command.idempotency_key, event.event_id)

            # 11. Publish event for real-time frontend updates
            await self.redis.publish(
                f"conductor:events:{command.workspace_id}",
                event.model_dump_json()
            )

            # 12. Notify agent of success
            await self.notify_agent(command, "SUCCESS", event.event_id)

        finally:
            # 13. Always release lock
            await self.region_lock.release(command)

10.2 Execution guarantees

Property Mechanism
At-most-once execution Idempotency key registry prevents duplicates
Ordered within workspace Redis sorted set queue per workspace_id
Parallel across workspace Separate queue partitions processed concurrently
No deadlocks Sorted lock acquisition + TTL auto-expiry
Conflict detection Version vectors catch stale reads
Full reversibility Event store captures before/after state
Failure visibility Dead letter queue captures all terminal failures
Permission enforcement Role check before lock acquisition (fail fast)

11. Anti-corruption layer

11.1 Boundary definition

The anti-corruption layer (ACL) is the single entry point where agent-produced data is validated, normalized, and sanitized before entering the graph. It extends flowsint-types' Pydantic validation with graph-specific invariants:

class GraphACL:
    """Anti-corruption layer between agents and the knowledge graph."""

    async def validate_command(self, command: GraphCommand) -> GraphCommand:
        """Validate and normalize a command before queuing."""

        # 1. Validate payload against flowsint-types Pydantic models
        if command.command_type in (CommandType.CREATE_NODE, CommandType.UPDATE_NODE):
            entity_type = command.payload.get("entity_type")
            model_class = ENTITY_REGISTRY.get(entity_type)
            if not model_class:
                raise InvalidEntityTypeError(entity_type)
            model_class.model_validate(command.payload.get("properties", {}))

        # 2. Ensure owner_id consistency
        if command.owner_id != self.current_user_id:
            raise OwnerMismatchError()

        # 3. Verify target nodes exist (for updates/deletes)
        if command.command_type in (CommandType.UPDATE_NODE, CommandType.DELETE_NODE):
            for node_id in command.target_node_ids:
                exists = await self.neo4j.node_exists(node_id, command.workspace_id)
                if not exists:
                    raise NodeNotFoundError(node_id)

        # 4. Prevent self-referencing edges
        if command.command_type == CommandType.CREATE_EDGE:
            if command.payload.get("source_id") == command.payload.get("target_id"):
                raise SelfReferenceError()

        # 5. Sanitize string properties (prevent Cypher injection)
        command.payload = self.sanitize_payload(command.payload)

        return command

11.2 Where it sits in the pipeline

Agent output → GraphACL.validate_command() → CommandQueue.enqueue() → ConductorExecutor

The ACL rejects invalid commands before they enter the queue, keeping the queue clean and reducing unnecessary lock acquisitions.


12. API surface additions

12.1 New REST endpoints

POST   /api/graph-conductor-command      — Submit a command (or batch)
GET    /api/graph-conductor-command/{id}  — Check command status
GET    /api/graph-conductor-command-event         — List events (with filtering)
POST   /api/graph-conductor-command-revert-event/
    {event_id} — Revert a specific event
POST   /api/graph-conductor-command-revert/
    {correlation_id} — Revert an entire enrichment run
GET    /api/graph-conductor-timeline       — Investigation timeline
GET    /api/graph-conductor-dead-letter-queue            — Dead letter queue listing
POST   /api/graph-conductor-dead-letter-queue-retry — Retry dead-lettered command
WS     /api/graph-conductor-command-stream         — WebSocket for real-time events

12.2 WebSocket event stream

Real-time graph updates for the frontend's Sigma.js visualization. The frontend subscribes via WebSocket and receives events as they are committed:

{
  "event_type": "NODE_CREATED",
  "event_id": "abc-123",
  "node_id": "domain-xyz",
  "entity_type": "Domain",
  "agent_id": "enrichment-agent-01",
  "timestamp": "2026-03-26T14:30:00Z",
  "correlation_id": "enrichment-run-456"
}

13. Migration strategy

13.1 Phased rollout

Phase 1 — Foundation (non-breaking). Add conductor/ module to flowsint-core. Run Neo4j migration to add _version properties. Create PostgreSQL graph_events and dead_letter_commands tables. Deploy Conductor Executor alongside existing Celery worker.

Phase 2 — Opt-in adoption. Introduce ConductorAwareTransform base class. Migrate 3-5 high-concurrency enrichers (e.g., domain_to_subdomains, domain_to_ip) to emit commands instead of direct writes. Legacy enrichers continue working unchanged.

Phase 3 — Default path. Make ConductorAwareTransform the default base class. Add conductor middleware to TransformOrchestrator that auto-wraps legacy postprocess() calls in commands. Expose revert UI in the frontend.

Phase 4 — Multi-agent support. Add agent registration, role assignment, and the WebSocket event stream. Enable concurrent agent operation on the same workspace.

13.2 Backward compatibility

The TransformOrchestrator detects which base class an enricher extends:

if isinstance(enricher, ConductorAwareTransform):
    commands = await enricher.postprocess(scan_results)
    # Commands go through Conductor pipeline
else:
    # Legacy path: wrap the direct write in a command transparently
    command = self.conductor.wrap_legacy_write(enricher, scan_results)
    await self.conductor.enqueue(command)

This means zero existing enrichers need modification in Phase 1-2. Third-party and community enrichers (like the HudsonRock integration in PR #129) continue working.


14. Performance considerations

14.1 Overhead budget

The Conductor adds latency to each mutation. Target overhead budget per command:

Step Target latency Mechanism
Queue enqueue < 1ms Redis ZADD
Idempotency check < 1ms Redis GET
Permission check < 0.1ms In-memory lookup
Lock acquisition < 5ms (uncontended) Redis SET NX
Version check < 2ms Neo4j property read
Before-state snapshot < 10ms Neo4j MATCH by ID
Mutation execution Variable Existing enricher latency
After-state snapshot < 10ms Neo4j MATCH by ID
Event write < 5ms PostgreSQL INSERT
Lock release < 1ms Redis Lua script
Total overhead < 35ms Excluding mutation itself

For enrichers whose scan() phase takes 1-10 seconds (external API calls), 35ms overhead is negligible — less than 3.5% of total enrichment time.

14.2 Throughput targets

With Redis queue and node-level locking, the Conductor supports:

  • 100+ concurrent agents per workspace (limited by distinct graph regions)
  • 1,000+ commands/second throughput per workspace partition
  • Event store write throughput limited by PostgreSQL (~10,000 inserts/second with batching)

14.3 Event store compaction

For long-running investigations, the event store grows continuously. A background compaction job periodically creates checkpoint snapshots and marks pre-checkpoint events as compacted (queryable but not individually revertible):

-- Compaction creates a checkpoint every 1000 events per workspace
INSERT INTO graph_checkpoints (workspace_id, checkpoint_at, full_state)
SELECT workspace_id, NOW(), graph_state_snapshot(workspace_id)
FROM graph_events
GROUP BY workspace_id
HAVING count(*) > 1000;

15. Testing strategy

15.1 Unit tests

Each conductor component is independently testable with mocked Redis and Neo4j:

  • test_command_queue.py — Enqueue/dequeue ordering, priority, TTL expiry
  • test_region_lock.py — Lock acquisition, sorted-order deadlock prevention, TTL expiry
  • test_version_vector.py — Check-and-increment atomicity, conflict detection
  • test_event_store.py — Event recording, revert generation, correlation queries
  • test_permissions.py — Role matrix enforcement, entity-type restrictions
  • test_idempotency.py — Duplicate detection, cached result retrieval
  • test_dead_letter.py — Failure capture, retry, inspection

15.2 Integration tests

  • Concurrent enrichment test: Two enrichers mutating the same node simultaneously — verify version conflict is detected and one retries
  • Revert chain test: Execute 5 enrichments, revert the 3rd, verify graph state matches expected
  • Dead letter test: Submit command with wrong permissions, verify it lands in DLQ
  • Legacy compatibility test: Run existing enricher through Conductor wrapper, verify identical Neo4j output

15.3 Load tests

  • 50 concurrent agents enriching a 10,000-node graph for 10 minutes
  • Measure: command throughput, lock contention rate, version conflict rate, event store growth, p99 latency

16. Open questions and future work

Distributed Conductor. This RFC assumes a single Conductor Executor instance per workspace partition. For horizontal scaling across multiple machines, the lock mechanism would need migration from Redis standalone to RedLock (Redis distributed lock algorithm) or a dedicated coordination service like etcd.

Graph-level ACID transactions. Neo4j supports multi-statement transactions, but the current enricher pipeline does not use them consistently. A future enhancement could make each command execute within a single Neo4j transaction, providing true atomicity at the database level in addition to application-level coordination.

Agent negotiation protocol. When two agents need the same region, the current design uses simple backoff-and-retry. A future version could implement agent-to-agent negotiation where a validation agent can signal an enrichment agent to pause, reducing wasted retries.

Frontend integration. The revert UI in the frontend (showing investigation timeline, per-event undo buttons, enrichment run grouping) is out of scope for this RFC but is the natural next step after Phase 3.

Conflict-free replicated data types (CRDTs). For properties that are inherently mergeable (e.g., tag sets, confidence scores), CRDT-based conflict resolution could eliminate version conflicts entirely for specific property types.


17. Decision log

Decision Rationale Alternatives considered
Redis for command queue Already in stack (Celery broker); sub-ms latency; sorted sets provide priority ordering Kafka (overkill for single-node), PostgreSQL LISTEN/NOTIFY (higher latency)
PostgreSQL for event store Already in stack; strong ACID guarantees for audit log; JSONB for flexible payloads Dedicated event store (EventStoreDB — adds operational complexity), Neo4j itself (mixing concerns)
Node-level lock granularity Maximizes concurrency; most enrichers touch disjoint nodes Edge-level (too fine-grained, high overhead), Workspace-level (too coarse, serializes all work)
Sorted-order lock acquisition Simple deadlock prevention without detection algorithms Wait-die/wound-wait (more complex), Timeout-only (risks livelock)
Version vectors as integers Simple; sufficient for single-writer-per-node-at-a-time model Vector clocks (needed for distributed systems, overkill here), Lamport timestamps
Compensating commands for revert Operation-agnostic; works with existing Neo4j driver Neo4j time-travel (not supported natively), CQRS event replay (requires full projection rebuild)

Appendix A: Superpowers compatibility

This RFC is designed to work with the superpowers agentic skills framework. A companion SKILL.md can be created at skills/graph-conductor/SKILL.md:

---
name: graph-conductor
description: Use when implementing or modifying graph mutations in flowsint — ensures all Neo4j writes go through the Conductor coordination layer with proper commands, locking, versioning, and event sourcing.
---

The skill would guide Claude agents to:

  1. Never write to Neo4j directly — always emit GraphCommand objects
  2. Declare target regions before starting work (claim-before-work)
  3. Include expected_versions for optimistic concurrency
  4. Generate deterministic idempotency_key values
  5. Handle VERSION_CONFLICT and REGION_BUSY responses gracefully

Appendix B: Glossary

Term Definition
Graph Conductor The coordination layer managing concurrent graph mutations
GraphCommand An immutable, serializable description of a graph mutation
Region A set of node/edge IDs that a command reads or writes
Version vector An integer version counter on each node/edge for conflict detection
Event An immutable record of a command's execution, including before/after state
Correlation ID Groups all commands/events from a single enrichment run
Causation ID Links a command to the command that triggered it
Dead letter queue Storage for commands that failed after all retries
ACL (Anti-Corruption Layer) Validation boundary between agent output and the command queue
Compensating command A command that reverses the effect of a previous command

This RFC is a living document. Feedback welcome via GitHub issues on reconurge/flowsint referencing this design.

Originally created by @gustavorps on GitHub (Mar 26, 2026). Original GitHub issue: https://github.com/reconurge/flowsint/issues/133 # RFC: Graph Conductor - Concurrency, Locking, and Coordination for Multi-Agent Graph Mutations in Flowsint v2 ``` RFC: graph-conductor Title: Graph Conductor — Multi-Agent Coordination Layer for the Knowledge Graph Author: Gustavo R. P. de Souza <@gustavorps> Status: Draft Created: 2026-03-26 Updated: 2026-03-26 Related: reconurge/flowsint#127 (Agnostic Revert Feature) reconurge/flowsint#109 (Undo enrichment) reconurge/flowsint#130 (Plugin Discovery and Namespace Code Structure) Superpowers: compatible (docs/plans/2026-03-26-graph-conductor-design.md) ``` --- ## Summary Flowsint's knowledge graph currently allows Celery workers to write nodes and edges to Neo4j without coordination, locking, or rollback capability. As the platform evolves toward multi-agent enrichment — where multiple Claude agents simultaneously populate, enrich, expand, and delete graph regions — this lack of coordination creates race conditions, phantom reads, conflicting writes, and irrecoverable state corruption. **Graph Conductor** is a new coordination layer inside `flowsint-core` that sits between agents/enrichers and Neo4j. It provides a **_command queue_**, **_graph region locking_**, **_optimistic concurrency control via version vectors_**, **_event sourcing for full reversibility_**, a **_claim-before-work protocol_**, **_role-scoped agent permissions_**, **_idempotent command execution_**, and a **_dead letter queue_** for failed mutations. Together, these mechanisms make the knowledge graph safe for concurrent multi-agent operation while enabling the "Agnostic Revert" capability requested in issue #127. --- ## 1. Motivation and problem statement ### 1.1 Current architecture gaps Flowsint's existing mutation path is straightforward but unsafe under concurrency: ``` Agent/Enricher → Celery.delay() → Redis Queue → Worker → Transform.postprocess() → Neo4j WRITE ``` Three specific problems arise when multiple agents operate simultaneously: **Lost updates.** Agent A reads a `Domain` node, enriches it with WHOIS data, and writes back. Agent B reads the same node before A's write, enriches it with DNS data, and writes back — overwriting A's WHOIS additions. Neo4j has no application-level optimistic concurrency control; the last writer wins silently. **Phantom graph regions.** Agent A discovers 50 subdomains and begins creating nodes. Agent B starts a cleanup task that deletes "stale" nodes in the same workspace. B deletes nodes A is still linking to, leaving orphaned edges pointing at nonexistent targets. **Irreversible mutations.** Issue #109 requests undo-enrichment capability. Issue #127 proposes an "Agnostic Revert Feature" — a mechanism to reverse any graph operation regardless of type. Today, once `postprocess()` writes to Neo4j, there is no record of what changed and no way to reverse it. The three-phase pipeline (preprocess → scan → postprocess) lacks a fourth phase: *record what happened so it can be undone*. ### 1.2 Why this matters for multi-agent OSINT The move toward AI-powered investigation means multiple Claude agents will operate on the same investigation graph concurrently: - An **Enrichment Agent** runs domain-to-subdomains while an **Attribution Agent** runs WHOIS lookups on the same domain - A **Expansion Agent** discovers new IP ranges while a **Validation Agent** prunes false-positive nodes - A **Triage Agent** tags high-priority nodes while a **Reporting Agent** reads the graph for export Without coordination, these agents corrupt each other's work. Graph Conductor solves this. ### 1.3 Relationship to existing issues | Issue | Title | How Graph Conductor addresses it | |-------|-------|----------------------------------| | #127 | [WiP] RFC: Agnostic Revert Feature | Event sourcing provides operation-type-agnostic undo for any graph mutation | | #109 | [Feature Request] Undo enrichment | Command journaling enables per-enrichment rollback via compensating commands | | #130 | [PROPOSAL] Plugin Discovery and Namespace | Role-scoped permissions integrate with plugin namespace boundaries | | #106 | [Feature Request] Add timestamps of events | Event sourcing naturally timestamps every mutation | | #94 | Configurable API-Based Enricher Type | Idempotent commands make retry-safe enrichers possible for unreliable external APIs | --- ## 2. Architecture overview Graph Conductor introduces a new module, `flowsint-core/src/flowsint/core/conductor/`, that intercepts all graph mutations before they reach Neo4j. It does not replace the existing Transform pipeline; it wraps `postprocess()` with coordination guarantees. ### 2.1 System context ```mermaid graph TD subgraph Agents["Claude Agent Team"] A1["🔍 Enrichment Agent<br/><i>role: enrichment</i>"] A2["🌐 Expansion Agent<br/><i>role: expansion</i>"] A3["✅ Validation Agent<br/><i>role: validation</i>"] A4["🧹 Cleanup Agent<br/><i>role: cleanup</i>"] A5["🏷️ Triage Agent<br/><i>role: triage</i>"] end subgraph Conductor["Graph Conductor — flowsint-core/conductor"] direction TB ACL["Anti-Corruption Layer<br/><i>Validate · Normalize · Sanitize</i>"] subgraph Queue["Command Queue — Redis Sorted Sets"] Q1["Workspace A Queue<br/><i>priority-ordered</i>"] Q2["Workspace B Queue<br/><i>priority-ordered</i>"] Q3["Workspace N Queue<br/><i>priority-ordered</i>"] end IDEM["Idempotency Registry<br/><i>Redis SET NX · 24h TTL</i>"] PERM["Permission Check<br/><i>Role → CommandType matrix</i>"] subgraph Locking["Graph Region Locking — Redis"] RL["Node-Level Locks<br/><i>SET NX EX · sorted acquisition</i>"] SL["Workspace-Level Locks<br/><i>bulk operations only</i>"] end subgraph Executor["Conductor Executor"] VV["Version Vector Check<br/><i>WHERE n._version = $expected</i>"] SNAP_B["Snapshot before_state"] EXEC["Execute Cypher Mutation"] SNAP_A["Snapshot after_state"] VV --> SNAP_B --> EXEC --> SNAP_A end end subgraph Storage["Persistence Layer"] NEO["Neo4j 5 + APOC<br/><i>Knowledge Graph</i>"] PG["PostgreSQL<br/><i>Event Store · DLQ</i>"] end subgraph Outputs["Downstream"] WS["WebSocket Stream<br/><i>Real-time frontend events</i>"] DLQ["Dead Letter Queue<br/><i>Failed commands</i>"] PUBSUB["Redis Pub/Sub<br/><i>conductor:events:{workspace_id}</i>"] end A1 & A2 & A3 & A4 & A5 -->|"POST /api/.../conductor/commands"| ACL ACL -->|"validated commands"| Queue Queue --> IDEM IDEM -->|"new command"| PERM IDEM -.->|"duplicate → cached result"| Agents PERM -->|"authorized"| Locking PERM -.->|"denied"| DLQ Locking -->|"lock acquired"| Executor Locking -.->|"lock timeout after retries"| DLQ EXEC -->|"Cypher + version bump"| NEO SNAP_A -->|"record event"| PG SNAP_A --> PUBSUB PUBSUB --> WS DLQ --> PG style Agents fill:transparent,stroke:#7F77DD,stroke-width:1.5px style Conductor fill:transparent,stroke:#D85A30,stroke-width:1.5px style Queue fill:transparent,stroke:#1D9E75,stroke-width:1px style Locking fill:transparent,stroke:#BA7517,stroke-width:1px style Executor fill:transparent,stroke:#185FA5,stroke-width:1px style Storage fill:transparent,stroke:#5F5E5A,stroke-width:1.5px style Outputs fill:transparent,stroke:#993556,stroke-width:1.5px style A1 fill:#EEEDFE,stroke:#534AB7,color:#3C3489 style A2 fill:#EEEDFE,stroke:#534AB7,color:#3C3489 style A3 fill:#EEEDFE,stroke:#534AB7,color:#3C3489 style A4 fill:#EEEDFE,stroke:#534AB7,color:#3C3489 style A5 fill:#EEEDFE,stroke:#534AB7,color:#3C3489 style ACL fill:#FAECE7,stroke:#993C1D,color:#712B13 style Q1 fill:#E1F5EE,stroke:#0F6E56,color:#085041 style Q2 fill:#E1F5EE,stroke:#0F6E56,color:#085041 style Q3 fill:#E1F5EE,stroke:#0F6E56,color:#085041 style IDEM fill:#E1F5EE,stroke:#0F6E56,color:#085041 style PERM fill:#FAECE7,stroke:#993C1D,color:#712B13 style RL fill:#FAEEDA,stroke:#854F0B,color:#633806 style SL fill:#FAEEDA,stroke:#854F0B,color:#633806 style VV fill:#E6F1FB,stroke:#185FA5,color:#0C447C style SNAP_B fill:#E6F1FB,stroke:#185FA5,color:#0C447C style EXEC fill:#E6F1FB,stroke:#185FA5,color:#0C447C style SNAP_A fill:#E6F1FB,stroke:#185FA5,color:#0C447C style NEO fill:#EAF3DE,stroke:#3B6D11,color:#27500A style PG fill:#F1EFE8,stroke:#5F5E5A,color:#444441 style WS fill:#FBEAF0,stroke:#993556,color:#72243E style DLQ fill:#FCEBEB,stroke:#A32D2D,color:#791F1F style PUBSUB fill:#FBEAF0,stroke:#993556,color:#72243E ``` ### 2.2 Module placement Graph Conductor lives inside `flowsint-core` and depends only on `flowsint-types`. It does not depend on `flowsint-enrichers` or `flowsint-api`, preserving the existing dependency hierarchy: ``` flowsint-types (unchanged — Pydantic entity models) ↑ flowsint-core/conductor (NEW — this RFC) ↑ flowsint-core (existing — orchestrator, tasks, vault, DB connectors) ↑ flowsint-enrichers (unchanged — enricher implementations) ↑ flowsint-api (unchanged — FastAPI routes) ``` ### 2.3 New directory structure ``` flowsint-core/src/flowsint/core/conductor/ ├── __init__.py ├── commands.py # GraphCommand base class + concrete command types ├── command_queue.py # Redis-backed FIFO command queue ├── region_lock.py # Graph region locking with Redis distributed locks ├── version_vector.py # Per-node version vector implementation ├── executor.py # ConductorExecutor — the core orchestration loop ├── event_store.py # PostgreSQL-backed event journal ├── event_types.py # Event schema definitions (Pydantic) ├── permissions.py # Role-scoped agent permission checks ├── dead_letter.py # Dead letter queue for failed commands ├── idempotency.py # Idempotency key registry └── anti_corruption.py # ACL boundary between agents and Neo4j ``` --- ## 3. Command queue pattern ### 3.1 Design Every graph mutation is expressed as a **GraphCommand** — a serializable, immutable data object that describes *what* should happen without executing it. Commands enter a Redis-backed FIFO queue partitioned by workspace ID, ensuring that mutations within a single investigation are ordered while mutations across investigations execute in parallel. ### 3.2 Command types ```python from enum import Enum from pydantic import BaseModel, Field from datetime import datetime from typing import Optional import uuid class CommandType(str, Enum): CREATE_NODE = "create_node" UPDATE_NODE = "update_node" DELETE_NODE = "delete_node" CREATE_EDGE = "create_edge" UPDATE_EDGE = "update_edge" DELETE_EDGE = "delete_edge" BATCH_MUTATION = "batch_mutation" class GraphCommand(BaseModel): """Immutable graph mutation command. Created by agents, executed by Conductor.""" command_id: str = Field(default_factory=lambda: str(uuid.uuid4())) command_type: CommandType workspace_id: str owner_id: str agent_id: str agent_role: str # e.g., "enrichment", "validation", "expansion" idempotency_key: str # Client-generated; prevents duplicate execution timestamp: datetime = Field(default_factory=datetime.utcnow) target_node_ids: list[str] = [] # Nodes this command reads or writes target_edge_ids: list[str] = [] # Edges this command reads or writes expected_versions: dict[str, int] = {} # node_id → expected version for OCC payload: dict # Command-specific data (entity type, properties, etc.) priority: int = 0 # Higher = processed first within queue partition ttl_seconds: int = 300 # Command expires if not executed within TTL causation_id: Optional[str] = None # ID of the command that caused this one correlation_id: Optional[str] = None # Groups related commands (e.g., one enrichment run) ``` ### 3.3 Queue mechanics Commands are enqueued to Redis sorted sets keyed by `conductor:queue:{workspace_id}`, scored by `(priority, timestamp)`. The Conductor Executor polls queues using `BZPOPMIN` for blocking dequeue with automatic priority ordering. ```python class CommandQueue: """Redis-backed priority command queue, partitioned by workspace.""" QUEUE_PREFIX = "conductor:queue:" async def enqueue(self, command: GraphCommand) -> None: key = f"{self.QUEUE_PREFIX}{command.workspace_id}" score = (-command.priority * 1e12) + command.timestamp.timestamp() await self.redis.zadd(key, {command.model_dump_json(): score}) async def dequeue(self, workspace_id: str, timeout: float = 5.0) -> GraphCommand | None: key = f"{self.QUEUE_PREFIX}{workspace_id}" result = await self.redis.bzpopmin(key, timeout=timeout) if result: _, payload, _ = result return GraphCommand.model_validate_json(payload) return None ``` ### 3.4 Integration with existing Transform pipeline The current `Transform.postprocess()` method writes directly to Neo4j. Graph Conductor wraps this by introducing `ConductorAwareTransform`: ```python class ConductorAwareTransform(Transform): """Transform subclass that routes mutations through Graph Conductor.""" async def postprocess(self, scan_results: dict) -> list[GraphCommand]: """Instead of writing to Neo4j directly, emit GraphCommands.""" commands = self.build_commands(scan_results) for cmd in commands: await self.conductor.enqueue(cmd) return commands def build_commands(self, scan_results: dict) -> list[GraphCommand]: """Subclasses override to produce commands from scan output.""" raise NotImplementedError ``` Existing enrichers continue to work unchanged — the `TransformOrchestrator` detects whether the enricher extends `ConductorAwareTransform` or the legacy `Transform` and routes accordingly. This provides a non-breaking migration path. --- ## 4. Graph region locking ### 4.1 Region definition A **graph region** is a set of node IDs and edge IDs that a command intends to read or write. Regions are declared upfront in `target_node_ids` and `target_edge_ids` on each `GraphCommand`. This is the **claim-before-work** pattern: agents must declare their intent before execution begins. ### 4.2 Lock granularity The locking system operates at **two granularities**: | Granularity | Lock key pattern | Use case | |-------------|-----------------|----------| | **Node-level** | `lock:node:{node_id}` | Single-node enrichments, property updates | | **Workspace-level** | `lock:workspace:{workspace_id}` | Bulk operations, graph-wide cleanup/export | Node-level locks use Redis `SET NX EX` (set-if-not-exists with expiry) for distributed mutual exclusion. Workspace-level locks are acquired as a Redis `SETNX` on the workspace key and block all node-level locks within that workspace. ### 4.3 Lock acquisition protocol ```python class RegionLock: """Distributed graph region lock using Redis.""" LOCK_TTL = 30 # seconds; auto-expires to prevent deadlocks RETRY_DELAY = 0.1 MAX_RETRIES = 50 async def acquire(self, command: GraphCommand) -> bool: """Acquire locks for all nodes/edges in the command's region. Uses sorted-order acquisition to prevent deadlocks.""" targets = sorted(command.target_node_ids + command.target_edge_ids) acquired = [] try: for target_id in targets: key = f"lock:node:{target_id}" for attempt in range(self.MAX_RETRIES): if await self.redis.set(key, command.command_id, nx=True, ex=self.LOCK_TTL): acquired.append(key) break await asyncio.sleep(self.RETRY_DELAY * (attempt + 1)) else: # Failed to acquire — release all and retry entire command await self._release_all(acquired, command.command_id) return False return True except Exception: await self._release_all(acquired, command.command_id) raise async def release(self, command: GraphCommand) -> None: """Release all locks held by this command.""" targets = sorted(command.target_node_ids + command.target_edge_ids) lock_keys = [f"lock:node:{tid}" for tid in targets] await self._release_all(lock_keys, command.command_id) async def _release_all(self, keys: list[str], owner: str) -> None: """Release locks only if still owned by this command (compare-and-delete).""" for key in keys: # Lua script for atomic compare-and-delete await self.redis.eval( "if redis.call('get', KEYS[1]) == ARGV[1] then " "return redis.call('del', KEYS[1]) else return 0 end", 1, key, owner ) ``` **Deadlock prevention**: Locks are always acquired in sorted order of target IDs. Combined with TTL-based auto-expiry, this eliminates deadlock scenarios entirely. ### 4.4 Claim-before-work pattern Agents must declare the graph region they intend to mutate *before* performing any external OSINT calls. This happens during the enricher's `preprocess()` phase: ``` 1. Agent receives task (e.g., "enrich domain example.com with subdomains") 2. preprocess() identifies target: node_id="domain-abc123" 3. Agent issues a CLAIM command declaring intent to write to region {domain-abc123} 4. Conductor validates the claim against the agent's role permissions 5. Conductor acquires region lock 6. Agent proceeds with scan() — external API calls 7. Agent issues mutation commands within the claimed region 8. Conductor executes commands and releases lock ``` If a claim cannot be granted (region already locked), the agent receives a `REGION_BUSY` response with the lock holder's agent ID and estimated completion time. The agent can then choose to wait, skip, or escalate. --- ## 5. Optimistic concurrency control with version vectors ### 5.1 Version vector design Every node and edge in the Neo4j graph carries a `_version` integer property. When an agent reads a node, it captures the current version. When it submits a mutation command, it includes `expected_versions` mapping each target to the version it read. The Conductor checks these versions at execution time. ```python class VersionVector: """Manages per-entity version tracking in Neo4j.""" async def check_and_increment( self, neo4j_session, node_id: str, expected_version: int ) -> bool: """Atomically check version and increment. Returns False on conflict.""" result = await neo4j_session.run( """ MATCH (n {id: $node_id}) WHERE n._version = $expected_version SET n._version = $expected_version + 1 RETURN n._version AS new_version """, node_id=node_id, expected_version=expected_version, ) record = await result.single() return record is not None ``` ### 5.2 Conflict resolution strategy When a version mismatch occurs: 1. **Default: Reject and retry.** The command is returned to the queue with an incremented retry counter. The agent receives a `VERSION_CONFLICT` event containing the current node state, allowing it to rebase its changes. 2. **Merge mode (opt-in).** For additive-only operations (e.g., appending new properties without overwriting existing ones), the Conductor can auto-merge by applying a **last-writer-wins per-property** strategy. The agent opts in by setting `payload.merge_strategy = "additive"`. 3. **Force mode (privileged).** Agents with the `admin` role can force-write, bypassing version checks. This is logged as a `FORCE_WRITE` event for audit. ### 5.3 Neo4j migration A migration in `neo4j-migrations/` adds the `_version` property to all existing nodes: ```cypher // Migration: 001_add_version_vectors.cypher MATCH (n) WHERE n._version IS NULL SET n._version = 0 RETURN count(n) AS nodes_versioned; // Create index for version lookups CREATE INDEX node_version_idx IF NOT EXISTS FOR (n:Entity) ON (n._version); ``` --- ## 6. Event sourcing ### 6.1 Event store design Every command execution produces an **event** — an immutable record of what changed. Events are stored in PostgreSQL (alongside existing user/chat data) in an append-only `graph_events` table. This provides the foundation for the "Agnostic Revert" capability requested in issue #127. ```sql CREATE TABLE graph_events ( event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(), command_id UUID NOT NULL, command_type VARCHAR(50) NOT NULL, workspace_id UUID NOT NULL, owner_id UUID NOT NULL, agent_id VARCHAR(255) NOT NULL, agent_role VARCHAR(50) NOT NULL, correlation_id UUID, -- Groups events from one enrichment run causation_id UUID, -- The command that triggered this event timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(), target_node_ids TEXT[] NOT NULL DEFAULT '{}', target_edge_ids TEXT[] NOT NULL DEFAULT '{}', before_state JSONB, -- Snapshot of affected entities BEFORE mutation after_state JSONB, -- Snapshot of affected entities AFTER mutation delta JSONB NOT NULL, -- Minimal diff of what changed is_reverted BOOLEAN NOT NULL DEFAULT FALSE, reverted_by UUID, -- Points to the compensating event metadata JSONB DEFAULT '{}' ); CREATE INDEX idx_events_workspace ON graph_events (workspace_id, timestamp DESC); CREATE INDEX idx_events_correlation ON graph_events (correlation_id); CREATE INDEX idx_events_node ON graph_events USING GIN (target_node_ids); ``` ### 6.2 Event lifecycle ``` Command submitted → Conductor acquires lock → Version check passes → Snapshot before_state from Neo4j → Execute Cypher mutation → Snapshot after_state from Neo4j → Compute delta (before_state → after_state) → Write event to graph_events table → Release lock → Emit event to Redis Pub/Sub for real-time frontend updates ``` ### 6.3 Agnostic revert (addressing issue #127) Reverting any operation is **operation-type-agnostic** because every event contains `before_state`. To revert: ```python class EventStore: async def revert_event(self, event_id: str) -> GraphCommand: """Generate a compensating command that undoes an event.""" event = await self.get_event(event_id) if event.is_reverted: raise AlreadyRevertedError(event_id) compensating_command = GraphCommand( command_type=self._inverse_type(event.command_type), workspace_id=event.workspace_id, owner_id=event.owner_id, agent_id="system:revert", agent_role="admin", idempotency_key=f"revert:{event.event_id}", payload=event.before_state, causation_id=event.command_id, correlation_id=event.correlation_id, ) return compensating_command async def revert_correlation(self, correlation_id: str) -> list[GraphCommand]: """Revert ALL events from a single enrichment run, in reverse order.""" events = await self.get_events_by_correlation( correlation_id, order="timestamp DESC" ) return [await self.revert_event(e.event_id) for e in events] ``` **This directly solves issue #127** ("Agnostic Revert Feature") because: - The revert mechanism works identically for node creates, updates, deletes, edge creates, and batch mutations - `revert_correlation()` undoes an entire enrichment run (solving issue #109 "Undo enrichment") - The `before_state`/`after_state` snapshots are self-contained — no enricher-specific logic needed - Reverts are themselves events, creating a full audit trail ### 6.4 Time-travel queries The event store enables investigation timeline reconstruction: ```python async def graph_state_at(self, workspace_id: str, timestamp: datetime) -> dict: """Reconstruct graph state at any point in time by replaying events.""" events = await self.get_events(workspace_id, before=timestamp, order="timestamp ASC") state = {} for event in events: if not event.is_reverted: self._apply_delta(state, event.delta) return state ``` --- ## 7. Idempotent commands ### 7.1 Problem External OSINT APIs are unreliable. Network timeouts, Celery worker restarts, and agent retries can cause the same enrichment to be submitted multiple times. Without idempotency, a `domain_to_subdomains` enrichment could create duplicate nodes. ### 7.2 Idempotency key registry Every `GraphCommand` carries an `idempotency_key` generated by the agent. The Conductor checks this key against a Redis-backed registry before execution: ```python class IdempotencyRegistry: """Prevents duplicate command execution using client-generated keys.""" KEY_PREFIX = "idempotency:" KEY_TTL = 86400 # 24 hours async def check_and_register(self, key: str, command_id: str) -> bool: """Returns True if this key is new (safe to execute). False if duplicate.""" registered = await self.redis.set( f"{self.KEY_PREFIX}{key}", command_id, nx=True, ex=self.KEY_TTL ) return registered is not None # True = new key, False = duplicate async def get_result(self, key: str) -> dict | None: """Retrieve the cached result of a previously executed command.""" result = await self.redis.get(f"{self.KEY_PREFIX}{key}:result") return json.loads(result) if result else None ``` ### 7.3 Key generation convention Agents generate idempotency keys using a deterministic hash of their intent: ```python def make_idempotency_key(agent_id: str, enricher_name: str, entity_id: str, params_hash: str) -> str: """Deterministic key: same agent + same enricher + same entity + same params = same key.""" raw = f"{agent_id}:{enricher_name}:{entity_id}:{params_hash}" return hashlib.sha256(raw.encode()).hexdigest() ``` When a duplicate is detected, the Conductor returns the cached result of the original execution without re-running the command. --- ## 8. Role-scoped agent permissions ### 8.1 Agent roles Each Claude agent operates under a declared role that constrains what graph mutations it can perform. Roles are defined in a permission matrix: | Role | CREATE nodes | UPDATE nodes | DELETE nodes | CREATE edges | DELETE edges | Bulk ops | Force write | |------|:-----------:|:-----------:|:-----------:|:-----------:|:-----------:|:--------:|:-----------:| | `enrichment` | ✅ | ✅ | ❌ | ✅ | ❌ | ❌ | ❌ | | `validation` | ❌ | ✅ | ❌ | ❌ | ❌ | ❌ | ❌ | | `expansion` | ✅ | ❌ | ❌ | ✅ | ❌ | ❌ | ❌ | | `cleanup` | ❌ | ❌ | ✅ | ❌ | ✅ | ✅ | ❌ | | `triage` | ❌ | ✅ | ❌ | ❌ | ❌ | ❌ | ❌ | | `admin` | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | | `readonly` | ❌ | ❌ | ❌ | ❌ | ❌ | ❌ | ❌ | ### 8.2 Permission enforcement ```python class AgentPermissions: """Enforces role-based command authorization.""" PERMISSION_MATRIX: dict[str, set[CommandType]] = { "enrichment": {CommandType.CREATE_NODE, CommandType.UPDATE_NODE, CommandType.CREATE_EDGE}, "validation": {CommandType.UPDATE_NODE}, "expansion": {CommandType.CREATE_NODE, CommandType.CREATE_EDGE}, "cleanup": {CommandType.DELETE_NODE, CommandType.DELETE_EDGE, CommandType.BATCH_MUTATION}, "triage": {CommandType.UPDATE_NODE}, "admin": set(CommandType), # All permissions "readonly": set(), # No mutations allowed } def authorize(self, command: GraphCommand) -> bool: allowed = self.PERMISSION_MATRIX.get(command.agent_role, set()) if command.command_type not in allowed: raise PermissionDeniedError( f"Agent role '{command.agent_role}' cannot execute '{command.command_type}'. " f"Allowed: {allowed}" ) return True ``` ### 8.3 Entity-type restrictions (optional extension) Roles can be further restricted by entity type. For example, a `crypto_enrichment` agent might only be allowed to create `CryptoWallet` and `CryptoTransaction` nodes. This maps naturally to flowsint-types' Pydantic model hierarchy: ```python ENTITY_TYPE_RESTRICTIONS: dict[str, set[str]] = { "crypto_enrichment": {"CryptoWallet", "CryptoNFT", "CryptoTransaction"}, "domain_enrichment": {"Domain", "IP", "ASN", "CIDR", "Website"}, "social_enrichment": {"Individual", "SocialProfile", "Email", "Phone"}, } ``` --- ## 9. Dead letter queue ### 9.1 Purpose Commands that fail after all retries — due to persistent version conflicts, permission errors, Neo4j connectivity issues, or malformed payloads — are moved to a **dead letter queue** (DLQ) instead of being silently dropped. ### 9.2 Implementation ```python class DeadLetterQueue: """Stores permanently failed commands for inspection and manual retry.""" DLQ_KEY = "conductor:dlq:{workspace_id}" async def enqueue(self, command: GraphCommand, failure_reason: str, attempts: int) -> None: entry = { "command": command.model_dump(), "failure_reason": failure_reason, "attempts": attempts, "failed_at": datetime.utcnow().isoformat(), } await self.redis.lpush( f"conductor:dlq:{command.workspace_id}", json.dumps(entry) ) # Also write to PostgreSQL for durable storage await self.pg_pool.execute( """INSERT INTO dead_letter_commands (command_id, workspace_id, owner_id, command_data, failure_reason, attempts, failed_at) VALUES ($1, $2, $3, $4, $5, $6, NOW())""", command.command_id, command.workspace_id, command.owner_id, json.dumps(command.model_dump()), failure_reason, attempts, ) async def inspect(self, workspace_id: str, limit: int = 50) -> list[dict]: """List failed commands for a workspace — exposed via API for debugging.""" return await self.pg_pool.fetch( "SELECT * FROM dead_letter_commands WHERE workspace_id = $1 ORDER BY failed_at DESC LIMIT $2", workspace_id, limit, ) async def retry(self, command_id: str) -> None: """Re-enqueue a dead-lettered command for another attempt.""" record = await self.pg_pool.fetchrow( "SELECT command_data FROM dead_letter_commands WHERE command_id = $1", command_id ) command = GraphCommand.model_validate_json(record["command_data"]) command.idempotency_key = f"retry:{command.command_id}:{uuid.uuid4()}" await self.command_queue.enqueue(command) ``` ### 9.3 DLQ API endpoints New endpoints in `flowsint-api` for DLQ management: ``` GET /api/workspace/{workspace_id}/conductor/dlq — List dead-lettered commands POST /api/workspace/{workspace_id}/conductor/dlq/{id}/retry — Retry a specific command DELETE /api/workspace/{workspace_id}/conductor/dlq/{id} — Dismiss a dead-lettered command ``` --- ## 10. Conductor executor — the orchestration loop ### 10.1 Execution flow The Conductor Executor is the central loop that ties all components together. It runs as a Celery task (or standalone asyncio worker) alongside the existing Celery workers: ```python class ConductorExecutor: """Core orchestration loop for Graph Conductor.""" MAX_RETRIES = 3 RETRY_BACKOFF = [1.0, 2.0, 5.0] async def run(self, workspace_id: str): """Main execution loop for a single workspace partition.""" while True: command = await self.command_queue.dequeue(workspace_id) if not command: continue try: await self.execute(command) except Exception as e: logger.error(f"Conductor error: {e}", extra={"command_id": command.command_id}) async def execute(self, command: GraphCommand, attempt: int = 0) -> None: """Execute a single command with full coordination guarantees.""" # 1. Idempotency check is_new = await self.idempotency.check_and_register( command.idempotency_key, command.command_id ) if not is_new: cached = await self.idempotency.get_result(command.idempotency_key) await self.notify_agent(command, "DUPLICATE", cached) return # 2. Permission check try: self.permissions.authorize(command) except PermissionDeniedError as e: await self.dead_letter.enqueue(command, str(e), attempt) await self.notify_agent(command, "PERMISSION_DENIED", str(e)) return # 3. TTL check if command.is_expired(): await self.dead_letter.enqueue(command, "EXPIRED", attempt) return # 4. Acquire region lock locked = await self.region_lock.acquire(command) if not locked: if attempt < self.MAX_RETRIES: await asyncio.sleep(self.RETRY_BACKOFF[attempt]) await self.execute(command, attempt + 1) else: await self.dead_letter.enqueue(command, "LOCK_TIMEOUT", attempt) return try: # 5. Version vector check for node_id, expected in command.expected_versions.items(): current = await self.version_vector.get_version(node_id) if current != expected: if attempt < self.MAX_RETRIES: await self.region_lock.release(command) await self.notify_agent(command, "VERSION_CONFLICT", { "node_id": node_id, "expected": expected, "actual": current }) return # Agent must rebase and resubmit else: await self.dead_letter.enqueue(command, "VERSION_CONFLICT", attempt) return # 6. Snapshot before_state before_state = await self.snapshot_targets(command) # 7. Execute Cypher mutation in Neo4j transaction async with self.neo4j.session() as session: async with session.begin_transaction() as tx: await self.apply_mutation(tx, command) # Increment versions for all written nodes for node_id in command.target_node_ids: await self.version_vector.increment(tx, node_id) await tx.commit() # 8. Snapshot after_state after_state = await self.snapshot_targets(command) # 9. Write event to event store event = await self.event_store.record( command=command, before_state=before_state, after_state=after_state, ) # 10. Cache result for idempotency await self.idempotency.store_result(command.idempotency_key, event.event_id) # 11. Publish event for real-time frontend updates await self.redis.publish( f"conductor:events:{command.workspace_id}", event.model_dump_json() ) # 12. Notify agent of success await self.notify_agent(command, "SUCCESS", event.event_id) finally: # 13. Always release lock await self.region_lock.release(command) ``` ### 10.2 Execution guarantees | Property | Mechanism | |----------|-----------| | **At-most-once execution** | Idempotency key registry prevents duplicates | | **Ordered within workspace** | Redis sorted set queue per workspace_id | | **Parallel across workspace** | Separate queue partitions processed concurrently | | **No deadlocks** | Sorted lock acquisition + TTL auto-expiry | | **Conflict detection** | Version vectors catch stale reads | | **Full reversibility** | Event store captures before/after state | | **Failure visibility** | Dead letter queue captures all terminal failures | | **Permission enforcement** | Role check before lock acquisition (fail fast) | --- ## 11. Anti-corruption layer ### 11.1 Boundary definition The anti-corruption layer (ACL) is the single entry point where agent-produced data is validated, normalized, and sanitized before entering the graph. It extends flowsint-types' Pydantic validation with graph-specific invariants: ```python class GraphACL: """Anti-corruption layer between agents and the knowledge graph.""" async def validate_command(self, command: GraphCommand) -> GraphCommand: """Validate and normalize a command before queuing.""" # 1. Validate payload against flowsint-types Pydantic models if command.command_type in (CommandType.CREATE_NODE, CommandType.UPDATE_NODE): entity_type = command.payload.get("entity_type") model_class = ENTITY_REGISTRY.get(entity_type) if not model_class: raise InvalidEntityTypeError(entity_type) model_class.model_validate(command.payload.get("properties", {})) # 2. Ensure owner_id consistency if command.owner_id != self.current_user_id: raise OwnerMismatchError() # 3. Verify target nodes exist (for updates/deletes) if command.command_type in (CommandType.UPDATE_NODE, CommandType.DELETE_NODE): for node_id in command.target_node_ids: exists = await self.neo4j.node_exists(node_id, command.workspace_id) if not exists: raise NodeNotFoundError(node_id) # 4. Prevent self-referencing edges if command.command_type == CommandType.CREATE_EDGE: if command.payload.get("source_id") == command.payload.get("target_id"): raise SelfReferenceError() # 5. Sanitize string properties (prevent Cypher injection) command.payload = self.sanitize_payload(command.payload) return command ``` ### 11.2 Where it sits in the pipeline ``` Agent output → GraphACL.validate_command() → CommandQueue.enqueue() → ConductorExecutor ``` The ACL rejects invalid commands *before* they enter the queue, keeping the queue clean and reducing unnecessary lock acquisitions. --- ## 12. API surface additions ### 12.1 New REST endpoints ``` POST /api/graph-conductor-command — Submit a command (or batch) GET /api/graph-conductor-command/{id} — Check command status GET /api/graph-conductor-command-event — List events (with filtering) POST /api/graph-conductor-command-revert-event/ {event_id} — Revert a specific event POST /api/graph-conductor-command-revert/ {correlation_id} — Revert an entire enrichment run GET /api/graph-conductor-timeline — Investigation timeline GET /api/graph-conductor-dead-letter-queue — Dead letter queue listing POST /api/graph-conductor-dead-letter-queue-retry — Retry dead-lettered command WS /api/graph-conductor-command-stream — WebSocket for real-time events ``` ### 12.2 WebSocket event stream Real-time graph updates for the frontend's Sigma.js visualization. The frontend subscribes via WebSocket and receives events as they are committed: ```json { "event_type": "NODE_CREATED", "event_id": "abc-123", "node_id": "domain-xyz", "entity_type": "Domain", "agent_id": "enrichment-agent-01", "timestamp": "2026-03-26T14:30:00Z", "correlation_id": "enrichment-run-456" } ``` --- ## 13. Migration strategy ### 13.1 Phased rollout **Phase 1 — Foundation (non-breaking).** Add `conductor/` module to `flowsint-core`. Run Neo4j migration to add `_version` properties. Create PostgreSQL `graph_events` and `dead_letter_commands` tables. Deploy Conductor Executor alongside existing Celery worker. **Phase 2 — Opt-in adoption.** Introduce `ConductorAwareTransform` base class. Migrate 3-5 high-concurrency enrichers (e.g., `domain_to_subdomains`, `domain_to_ip`) to emit commands instead of direct writes. Legacy enrichers continue working unchanged. **Phase 3 — Default path.** Make `ConductorAwareTransform` the default base class. Add conductor middleware to `TransformOrchestrator` that auto-wraps legacy `postprocess()` calls in commands. Expose revert UI in the frontend. **Phase 4 — Multi-agent support.** Add agent registration, role assignment, and the WebSocket event stream. Enable concurrent agent operation on the same workspace. ### 13.2 Backward compatibility The `TransformOrchestrator` detects which base class an enricher extends: ```python if isinstance(enricher, ConductorAwareTransform): commands = await enricher.postprocess(scan_results) # Commands go through Conductor pipeline else: # Legacy path: wrap the direct write in a command transparently command = self.conductor.wrap_legacy_write(enricher, scan_results) await self.conductor.enqueue(command) ``` This means **zero existing enrichers need modification** in Phase 1-2. Third-party and community enrichers (like the HudsonRock integration in PR #129) continue working. --- ## 14. Performance considerations ### 14.1 Overhead budget The Conductor adds latency to each mutation. Target overhead budget per command: | Step | Target latency | Mechanism | |------|---------------|-----------| | Queue enqueue | < 1ms | Redis ZADD | | Idempotency check | < 1ms | Redis GET | | Permission check | < 0.1ms | In-memory lookup | | Lock acquisition | < 5ms (uncontended) | Redis SET NX | | Version check | < 2ms | Neo4j property read | | Before-state snapshot | < 10ms | Neo4j MATCH by ID | | Mutation execution | Variable | Existing enricher latency | | After-state snapshot | < 10ms | Neo4j MATCH by ID | | Event write | < 5ms | PostgreSQL INSERT | | Lock release | < 1ms | Redis Lua script | | **Total overhead** | **< 35ms** | Excluding mutation itself | For enrichers whose `scan()` phase takes 1-10 seconds (external API calls), **35ms overhead is negligible** — less than 3.5% of total enrichment time. ### 14.2 Throughput targets With Redis queue and node-level locking, the Conductor supports: - **100+ concurrent agents** per workspace (limited by distinct graph regions) - **1,000+ commands/second** throughput per workspace partition - **Event store** write throughput limited by PostgreSQL (~10,000 inserts/second with batching) ### 14.3 Event store compaction For long-running investigations, the event store grows continuously. A background compaction job periodically creates **checkpoint snapshots** and marks pre-checkpoint events as compacted (queryable but not individually revertible): ```sql -- Compaction creates a checkpoint every 1000 events per workspace INSERT INTO graph_checkpoints (workspace_id, checkpoint_at, full_state) SELECT workspace_id, NOW(), graph_state_snapshot(workspace_id) FROM graph_events GROUP BY workspace_id HAVING count(*) > 1000; ``` --- ## 15. Testing strategy ### 15.1 Unit tests Each conductor component is independently testable with mocked Redis and Neo4j: - `test_command_queue.py` — Enqueue/dequeue ordering, priority, TTL expiry - `test_region_lock.py` — Lock acquisition, sorted-order deadlock prevention, TTL expiry - `test_version_vector.py` — Check-and-increment atomicity, conflict detection - `test_event_store.py` — Event recording, revert generation, correlation queries - `test_permissions.py` — Role matrix enforcement, entity-type restrictions - `test_idempotency.py` — Duplicate detection, cached result retrieval - `test_dead_letter.py` — Failure capture, retry, inspection ### 15.2 Integration tests - **Concurrent enrichment test**: Two enrichers mutating the same node simultaneously — verify version conflict is detected and one retries - **Revert chain test**: Execute 5 enrichments, revert the 3rd, verify graph state matches expected - **Dead letter test**: Submit command with wrong permissions, verify it lands in DLQ - **Legacy compatibility test**: Run existing enricher through Conductor wrapper, verify identical Neo4j output ### 15.3 Load tests - 50 concurrent agents enriching a 10,000-node graph for 10 minutes - Measure: command throughput, lock contention rate, version conflict rate, event store growth, p99 latency --- ## 16. Open questions and future work **Distributed Conductor.** This RFC assumes a single Conductor Executor instance per workspace partition. For horizontal scaling across multiple machines, the lock mechanism would need migration from Redis standalone to RedLock (Redis distributed lock algorithm) or a dedicated coordination service like etcd. **Graph-level ACID transactions.** Neo4j supports multi-statement transactions, but the current enricher pipeline does not use them consistently. A future enhancement could make each command execute within a single Neo4j transaction, providing true atomicity at the database level in addition to application-level coordination. **Agent negotiation protocol.** When two agents need the same region, the current design uses simple backoff-and-retry. A future version could implement agent-to-agent negotiation where a `validation` agent can signal an `enrichment` agent to pause, reducing wasted retries. **Frontend integration.** The revert UI in the frontend (showing investigation timeline, per-event undo buttons, enrichment run grouping) is out of scope for this RFC but is the natural next step after Phase 3. **Conflict-free replicated data types (CRDTs).** For properties that are inherently mergeable (e.g., tag sets, confidence scores), CRDT-based conflict resolution could eliminate version conflicts entirely for specific property types. --- ## 17. Decision log | Decision | Rationale | Alternatives considered | |----------|-----------|------------------------| | Redis for command queue | Already in stack (Celery broker); sub-ms latency; sorted sets provide priority ordering | Kafka (overkill for single-node), PostgreSQL LISTEN/NOTIFY (higher latency) | | PostgreSQL for event store | Already in stack; strong ACID guarantees for audit log; JSONB for flexible payloads | Dedicated event store (EventStoreDB — adds operational complexity), Neo4j itself (mixing concerns) | | Node-level lock granularity | Maximizes concurrency; most enrichers touch disjoint nodes | Edge-level (too fine-grained, high overhead), Workspace-level (too coarse, serializes all work) | | Sorted-order lock acquisition | Simple deadlock prevention without detection algorithms | Wait-die/wound-wait (more complex), Timeout-only (risks livelock) | | Version vectors as integers | Simple; sufficient for single-writer-per-node-at-a-time model | Vector clocks (needed for distributed systems, overkill here), Lamport timestamps | | Compensating commands for revert | Operation-agnostic; works with existing Neo4j driver | Neo4j time-travel (not supported natively), CQRS event replay (requires full projection rebuild) | --- ## Appendix A: Superpowers compatibility This RFC is designed to work with the [superpowers](https://github.com/obra/superpowers) agentic skills framework. A companion SKILL.md can be created at `skills/graph-conductor/SKILL.md`: ```yaml --- name: graph-conductor description: Use when implementing or modifying graph mutations in flowsint — ensures all Neo4j writes go through the Conductor coordination layer with proper commands, locking, versioning, and event sourcing. --- ``` The skill would guide Claude agents to: 1. Never write to Neo4j directly — always emit `GraphCommand` objects 2. Declare target regions before starting work (claim-before-work) 3. Include `expected_versions` for optimistic concurrency 4. Generate deterministic `idempotency_key` values 5. Handle `VERSION_CONFLICT` and `REGION_BUSY` responses gracefully --- ## Appendix B: Glossary | Term | Definition | |------|-----------| | **Graph Conductor** | The coordination layer managing concurrent graph mutations | | **GraphCommand** | An immutable, serializable description of a graph mutation | | **Region** | A set of node/edge IDs that a command reads or writes | | **Version vector** | An integer version counter on each node/edge for conflict detection | | **Event** | An immutable record of a command's execution, including before/after state | | **Correlation ID** | Groups all commands/events from a single enrichment run | | **Causation ID** | Links a command to the command that triggered it | | **Dead letter queue** | Storage for commands that failed after all retries | | **ACL (Anti-Corruption Layer)** | Validation boundary between agent output and the command queue | | **Compensating command** | A command that reverses the effect of a previous command | --- *This RFC is a living document. Feedback welcome via GitHub issues on `reconurge/flowsint` referencing this design.*
Author
Owner

@m13v commented on GitHub (Mar 26, 2026):

the claim-before-work protocol is the right call. we run 8+ Claude agents in parallel on the same codebase and the hardest part isn't the git layer (worktrees solve that) - it's coordinating shared resources outside of version control. browser sessions, build caches, port ranges, database connections. your region locking maps directly to what we ended up building: a simple file-based lock that agents must acquire before touching a shared resource, with a heartbeat to detect dead agents that never released. the dead letter queue is smart too, we had a case where an agent crashed mid-mutation and left the browser in a half-navigated state. without something like your event sourcing for rollback, we had to manually clean up. one thing to watch: the version vector overhead can get expensive if your graph is large. we found that coarse-grained locks (per-resource-type rather than per-node) were good enough for our use case and way cheaper to maintain.

<!-- gh-comment-id:4132426499 --> @m13v commented on GitHub (Mar 26, 2026): the claim-before-work protocol is the right call. we run 8+ Claude agents in parallel on the same codebase and the hardest part isn't the git layer (worktrees solve that) - it's coordinating shared resources outside of version control. browser sessions, build caches, port ranges, database connections. your region locking maps directly to what we ended up building: a simple file-based lock that agents must acquire before touching a shared resource, with a heartbeat to detect dead agents that never released. the dead letter queue is smart too, we had a case where an agent crashed mid-mutation and left the browser in a half-navigated state. without something like your event sourcing for rollback, we had to manually clean up. one thing to watch: the version vector overhead can get expensive if your graph is large. we found that coarse-grained locks (per-resource-type rather than per-node) were good enough for our use case and way cheaper to maintain.
Author
Owner

@m13v commented on GitHub (Mar 26, 2026):

our browser lock implementation that prevents multiple agents from fighting over shared resources: https://github.com/m13v/browser-lock/blob/main/playwright-lock.sh

and the tmux-based agent orchestration that manages parallel sessions, worktree creation, and completion detection: https://github.com/m13v/tmux-background-agents/blob/main/SKILL.md

<!-- gh-comment-id:4132431187 --> @m13v commented on GitHub (Mar 26, 2026): our browser lock implementation that prevents multiple agents from fighting over shared resources: https://github.com/m13v/browser-lock/blob/main/playwright-lock.sh and the tmux-based agent orchestration that manages parallel sessions, worktree creation, and completion detection: https://github.com/m13v/tmux-background-agents/blob/main/SKILL.md
Author
Owner

@gustavorps commented on GitHub (Mar 27, 2026):

Thank you very much @m13v, to share your experience and contributions with us!
There really is a lot of valuable feedback and insights! Especially the tip about coarse-grained locks (by resource type, rather than by node), I believe this approach is worth testing.

I’m still wrapping my head around the complexity of the feature, so I’ll take the time to break it down into smaller parts.
With dedication, the support of a good AI model, and a little luck, we’ll have this functionality up and running by the end of this semester.

<!-- gh-comment-id:4139930672 --> @gustavorps commented on GitHub (Mar 27, 2026): Thank you very much @m13v, to share your experience and contributions with us! There really is a lot of valuable feedback and insights! Especially the tip about _coarse-grained locks_ (by resource type, rather than by node), I believe this approach is worth testing. I’m still wrapping my head around the complexity of the feature, so I’ll take the time to break it down into smaller parts. With dedication, the support of a good AI model, and a little luck, we’ll have this functionality up and running by the end of this semester.
Author
Owner

@m13v commented on GitHub (Mar 27, 2026):

Glad the coarse-grained locking tip resonated! That approach saved us a ton of complexity vs trying to lock individual nodes.

Breaking it into smaller parts is definitely the way to go. If it helps, we found starting with a simple file-based lock per resource type (graphs, nodes, edges as separate lock scopes) was enough to validate the design before adding heartbeat and dead agent detection later.

Happy to compare notes as you prototype it.

<!-- gh-comment-id:4140320367 --> @m13v commented on GitHub (Mar 27, 2026): Glad the coarse-grained locking tip resonated! That approach saved us a ton of complexity vs trying to lock individual nodes. Breaking it into smaller parts is definitely the way to go. If it helps, we found starting with a simple file-based lock per resource type (graphs, nodes, edges as separate lock scopes) was enough to validate the design before adding heartbeat and dead agent detection later. Happy to compare notes as you prototype it.
Sign in to join this conversation.
1 Participants
Notifications
Due Date
No due date set.
Dependencies

No dependencies set.

Reference: github-starred/flowsint#501