mirror of
https://github.com/reconurge/flowsint.git
synced 2026-05-07 04:09:49 -05:00
[GH-ISSUE #133] [DRAFT] RFC: Graph Conductor - Concurrency, Locking, and Coordination for Multi-Agent Graph Mutations in Flowsint v2 #94
Reference in New Issue
Block a user
Delete Branch "%!s()"
Deleting a branch is permanent. Although the deleted branch may continue to exist for a short time before it actually gets removed, it CANNOT be undone in most cases. Continue?
Originally created by @gustavorps on GitHub (Mar 26, 2026).
Original GitHub issue: https://github.com/reconurge/flowsint/issues/133
RFC: Graph Conductor - Concurrency, Locking, and Coordination for Multi-Agent Graph Mutations in Flowsint v2
Summary
Flowsint's knowledge graph currently allows Celery workers to write nodes and edges to Neo4j without coordination, locking, or rollback capability. As the platform evolves toward multi-agent enrichment — where multiple Claude agents simultaneously populate, enrich, expand, and delete graph regions — this lack of coordination creates race conditions, phantom reads, conflicting writes, and irrecoverable state corruption.
Graph Conductor is a new coordination layer inside
flowsint-corethat sits between agents/enrichers and Neo4j. It provides a command queue, graph region locking, optimistic concurrency control via version vectors, event sourcing for full reversibility, a claim-before-work protocol, role-scoped agent permissions, idempotent command execution, and a dead letter queue for failed mutations. Together, these mechanisms make the knowledge graph safe for concurrent multi-agent operation while enabling the "Agnostic Revert" capability requested in issue #127.1. Motivation and problem statement
1.1 Current architecture gaps
Flowsint's existing mutation path is straightforward but unsafe under concurrency:
Three specific problems arise when multiple agents operate simultaneously:
Lost updates. Agent A reads a
Domainnode, enriches it with WHOIS data, and writes back. Agent B reads the same node before A's write, enriches it with DNS data, and writes back — overwriting A's WHOIS additions. Neo4j has no application-level optimistic concurrency control; the last writer wins silently.Phantom graph regions. Agent A discovers 50 subdomains and begins creating nodes. Agent B starts a cleanup task that deletes "stale" nodes in the same workspace. B deletes nodes A is still linking to, leaving orphaned edges pointing at nonexistent targets.
Irreversible mutations. Issue #109 requests undo-enrichment capability. Issue #127 proposes an "Agnostic Revert Feature" — a mechanism to reverse any graph operation regardless of type. Today, once
postprocess()writes to Neo4j, there is no record of what changed and no way to reverse it. The three-phase pipeline (preprocess → scan → postprocess) lacks a fourth phase: record what happened so it can be undone.1.2 Why this matters for multi-agent OSINT
The move toward AI-powered investigation means multiple Claude agents will operate on the same investigation graph concurrently:
Without coordination, these agents corrupt each other's work. Graph Conductor solves this.
1.3 Relationship to existing issues
2. Architecture overview
Graph Conductor introduces a new module,
flowsint-core/src/flowsint/core/conductor/, that intercepts all graph mutations before they reach Neo4j. It does not replace the existing Transform pipeline; it wrapspostprocess()with coordination guarantees.2.1 System context
2.2 Module placement
Graph Conductor lives inside
flowsint-coreand depends only onflowsint-types. It does not depend onflowsint-enrichersorflowsint-api, preserving the existing dependency hierarchy:2.3 New directory structure
3. Command queue pattern
3.1 Design
Every graph mutation is expressed as a GraphCommand — a serializable, immutable data object that describes what should happen without executing it. Commands enter a Redis-backed FIFO queue partitioned by workspace ID, ensuring that mutations within a single investigation are ordered while mutations across investigations execute in parallel.
3.2 Command types
3.3 Queue mechanics
Commands are enqueued to Redis sorted sets keyed by
conductor:queue:{workspace_id}, scored by(priority, timestamp). The Conductor Executor polls queues usingBZPOPMINfor blocking dequeue with automatic priority ordering.3.4 Integration with existing Transform pipeline
The current
Transform.postprocess()method writes directly to Neo4j. Graph Conductor wraps this by introducingConductorAwareTransform:Existing enrichers continue to work unchanged — the
TransformOrchestratordetects whether the enricher extendsConductorAwareTransformor the legacyTransformand routes accordingly. This provides a non-breaking migration path.4. Graph region locking
4.1 Region definition
A graph region is a set of node IDs and edge IDs that a command intends to read or write. Regions are declared upfront in
target_node_idsandtarget_edge_idson eachGraphCommand. This is the claim-before-work pattern: agents must declare their intent before execution begins.4.2 Lock granularity
The locking system operates at two granularities:
lock:node:{node_id}lock:workspace:{workspace_id}Node-level locks use Redis
SET NX EX(set-if-not-exists with expiry) for distributed mutual exclusion. Workspace-level locks are acquired as a RedisSETNXon the workspace key and block all node-level locks within that workspace.4.3 Lock acquisition protocol
Deadlock prevention: Locks are always acquired in sorted order of target IDs. Combined with TTL-based auto-expiry, this eliminates deadlock scenarios entirely.
4.4 Claim-before-work pattern
Agents must declare the graph region they intend to mutate before performing any external OSINT calls. This happens during the enricher's
preprocess()phase:If a claim cannot be granted (region already locked), the agent receives a
REGION_BUSYresponse with the lock holder's agent ID and estimated completion time. The agent can then choose to wait, skip, or escalate.5. Optimistic concurrency control with version vectors
5.1 Version vector design
Every node and edge in the Neo4j graph carries a
_versioninteger property. When an agent reads a node, it captures the current version. When it submits a mutation command, it includesexpected_versionsmapping each target to the version it read. The Conductor checks these versions at execution time.5.2 Conflict resolution strategy
When a version mismatch occurs:
VERSION_CONFLICTevent containing the current node state, allowing it to rebase its changes.payload.merge_strategy = "additive".adminrole can force-write, bypassing version checks. This is logged as aFORCE_WRITEevent for audit.5.3 Neo4j migration
A migration in
neo4j-migrations/adds the_versionproperty to all existing nodes:6. Event sourcing
6.1 Event store design
Every command execution produces an event — an immutable record of what changed. Events are stored in PostgreSQL (alongside existing user/chat data) in an append-only
graph_eventstable. This provides the foundation for the "Agnostic Revert" capability requested in issue #127.6.2 Event lifecycle
6.3 Agnostic revert (addressing issue #127)
Reverting any operation is operation-type-agnostic because every event contains
before_state. To revert:This directly solves issue #127 ("Agnostic Revert Feature") because:
revert_correlation()undoes an entire enrichment run (solving issue #109 "Undo enrichment")before_state/after_statesnapshots are self-contained — no enricher-specific logic needed6.4 Time-travel queries
The event store enables investigation timeline reconstruction:
7. Idempotent commands
7.1 Problem
External OSINT APIs are unreliable. Network timeouts, Celery worker restarts, and agent retries can cause the same enrichment to be submitted multiple times. Without idempotency, a
domain_to_subdomainsenrichment could create duplicate nodes.7.2 Idempotency key registry
Every
GraphCommandcarries anidempotency_keygenerated by the agent. The Conductor checks this key against a Redis-backed registry before execution:7.3 Key generation convention
Agents generate idempotency keys using a deterministic hash of their intent:
When a duplicate is detected, the Conductor returns the cached result of the original execution without re-running the command.
8. Role-scoped agent permissions
8.1 Agent roles
Each Claude agent operates under a declared role that constrains what graph mutations it can perform. Roles are defined in a permission matrix:
enrichmentvalidationexpansioncleanuptriageadminreadonly8.2 Permission enforcement
8.3 Entity-type restrictions (optional extension)
Roles can be further restricted by entity type. For example, a
crypto_enrichmentagent might only be allowed to createCryptoWalletandCryptoTransactionnodes. This maps naturally to flowsint-types' Pydantic model hierarchy:9. Dead letter queue
9.1 Purpose
Commands that fail after all retries — due to persistent version conflicts, permission errors, Neo4j connectivity issues, or malformed payloads — are moved to a dead letter queue (DLQ) instead of being silently dropped.
9.2 Implementation
9.3 DLQ API endpoints
New endpoints in
flowsint-apifor DLQ management:10. Conductor executor — the orchestration loop
10.1 Execution flow
The Conductor Executor is the central loop that ties all components together. It runs as a Celery task (or standalone asyncio worker) alongside the existing Celery workers:
10.2 Execution guarantees
11. Anti-corruption layer
11.1 Boundary definition
The anti-corruption layer (ACL) is the single entry point where agent-produced data is validated, normalized, and sanitized before entering the graph. It extends flowsint-types' Pydantic validation with graph-specific invariants:
11.2 Where it sits in the pipeline
The ACL rejects invalid commands before they enter the queue, keeping the queue clean and reducing unnecessary lock acquisitions.
12. API surface additions
12.1 New REST endpoints
12.2 WebSocket event stream
Real-time graph updates for the frontend's Sigma.js visualization. The frontend subscribes via WebSocket and receives events as they are committed:
13. Migration strategy
13.1 Phased rollout
Phase 1 — Foundation (non-breaking). Add
conductor/module toflowsint-core. Run Neo4j migration to add_versionproperties. Create PostgreSQLgraph_eventsanddead_letter_commandstables. Deploy Conductor Executor alongside existing Celery worker.Phase 2 — Opt-in adoption. Introduce
ConductorAwareTransformbase class. Migrate 3-5 high-concurrency enrichers (e.g.,domain_to_subdomains,domain_to_ip) to emit commands instead of direct writes. Legacy enrichers continue working unchanged.Phase 3 — Default path. Make
ConductorAwareTransformthe default base class. Add conductor middleware toTransformOrchestratorthat auto-wraps legacypostprocess()calls in commands. Expose revert UI in the frontend.Phase 4 — Multi-agent support. Add agent registration, role assignment, and the WebSocket event stream. Enable concurrent agent operation on the same workspace.
13.2 Backward compatibility
The
TransformOrchestratordetects which base class an enricher extends:This means zero existing enrichers need modification in Phase 1-2. Third-party and community enrichers (like the HudsonRock integration in PR #129) continue working.
14. Performance considerations
14.1 Overhead budget
The Conductor adds latency to each mutation. Target overhead budget per command:
For enrichers whose
scan()phase takes 1-10 seconds (external API calls), 35ms overhead is negligible — less than 3.5% of total enrichment time.14.2 Throughput targets
With Redis queue and node-level locking, the Conductor supports:
14.3 Event store compaction
For long-running investigations, the event store grows continuously. A background compaction job periodically creates checkpoint snapshots and marks pre-checkpoint events as compacted (queryable but not individually revertible):
15. Testing strategy
15.1 Unit tests
Each conductor component is independently testable with mocked Redis and Neo4j:
test_command_queue.py— Enqueue/dequeue ordering, priority, TTL expirytest_region_lock.py— Lock acquisition, sorted-order deadlock prevention, TTL expirytest_version_vector.py— Check-and-increment atomicity, conflict detectiontest_event_store.py— Event recording, revert generation, correlation queriestest_permissions.py— Role matrix enforcement, entity-type restrictionstest_idempotency.py— Duplicate detection, cached result retrievaltest_dead_letter.py— Failure capture, retry, inspection15.2 Integration tests
15.3 Load tests
16. Open questions and future work
Distributed Conductor. This RFC assumes a single Conductor Executor instance per workspace partition. For horizontal scaling across multiple machines, the lock mechanism would need migration from Redis standalone to RedLock (Redis distributed lock algorithm) or a dedicated coordination service like etcd.
Graph-level ACID transactions. Neo4j supports multi-statement transactions, but the current enricher pipeline does not use them consistently. A future enhancement could make each command execute within a single Neo4j transaction, providing true atomicity at the database level in addition to application-level coordination.
Agent negotiation protocol. When two agents need the same region, the current design uses simple backoff-and-retry. A future version could implement agent-to-agent negotiation where a
validationagent can signal anenrichmentagent to pause, reducing wasted retries.Frontend integration. The revert UI in the frontend (showing investigation timeline, per-event undo buttons, enrichment run grouping) is out of scope for this RFC but is the natural next step after Phase 3.
Conflict-free replicated data types (CRDTs). For properties that are inherently mergeable (e.g., tag sets, confidence scores), CRDT-based conflict resolution could eliminate version conflicts entirely for specific property types.
17. Decision log
Appendix A: Superpowers compatibility
This RFC is designed to work with the superpowers agentic skills framework. A companion SKILL.md can be created at
skills/graph-conductor/SKILL.md:The skill would guide Claude agents to:
GraphCommandobjectsexpected_versionsfor optimistic concurrencyidempotency_keyvaluesVERSION_CONFLICTandREGION_BUSYresponses gracefullyAppendix B: Glossary
This RFC is a living document. Feedback welcome via GitHub issues on
reconurge/flowsintreferencing this design.@m13v commented on GitHub (Mar 26, 2026):
the claim-before-work protocol is the right call. we run 8+ Claude agents in parallel on the same codebase and the hardest part isn't the git layer (worktrees solve that) - it's coordinating shared resources outside of version control. browser sessions, build caches, port ranges, database connections. your region locking maps directly to what we ended up building: a simple file-based lock that agents must acquire before touching a shared resource, with a heartbeat to detect dead agents that never released. the dead letter queue is smart too, we had a case where an agent crashed mid-mutation and left the browser in a half-navigated state. without something like your event sourcing for rollback, we had to manually clean up. one thing to watch: the version vector overhead can get expensive if your graph is large. we found that coarse-grained locks (per-resource-type rather than per-node) were good enough for our use case and way cheaper to maintain.
@m13v commented on GitHub (Mar 26, 2026):
our browser lock implementation that prevents multiple agents from fighting over shared resources: https://github.com/m13v/browser-lock/blob/main/playwright-lock.sh
and the tmux-based agent orchestration that manages parallel sessions, worktree creation, and completion detection: https://github.com/m13v/tmux-background-agents/blob/main/SKILL.md
@gustavorps commented on GitHub (Mar 27, 2026):
Thank you very much @m13v, to share your experience and contributions with us!
There really is a lot of valuable feedback and insights! Especially the tip about coarse-grained locks (by resource type, rather than by node), I believe this approach is worth testing.
I’m still wrapping my head around the complexity of the feature, so I’ll take the time to break it down into smaller parts.
With dedication, the support of a good AI model, and a little luck, we’ll have this functionality up and running by the end of this semester.
@m13v commented on GitHub (Mar 27, 2026):
Glad the coarse-grained locking tip resonated! That approach saved us a ton of complexity vs trying to lock individual nodes.
Breaking it into smaller parts is definitely the way to go. If it helps, we found starting with a simple file-based lock per resource type (graphs, nodes, edges as separate lock scopes) was enough to validate the design before adding heartbeat and dead agent detection later.
Happy to compare notes as you prototype it.