[GH-ISSUE #127] [WiP] RFC: Agnostic Revert Feature #1051

Open
opened 2026-05-03 01:56:16 -05:00 by GiteaMirror · 0 comments
Owner

Originally created by @gustavorps on GitHub (Feb 27, 2026).
Original GitHub issue: https://github.com/reconurge/flowsint/issues/127

RFC: Agnostic Revert Feature for User and Enricher Actions

1. Overview

This RFC proposes an architecture and implementation strategy for a generic and reusable "Undo" feature within Flowsint. The feature will allow the system to revert creations, deletions, and updates on graph elements (FlowNode and FlowEdge), whether these actions are initiated manually by a user or automatically by an enricher/pipeline/flow.

The history of actions will be stored in PostgreSQL using a data model based on [Schema.org/Action](https://schema.org/Action), a proposal for consistent ontology and data models for future implementations, while the actual graph data remains in Neo4j.

2. Motivation

As users investigate data and run enrichers, the graph can quickly become cluttered or altered in unintended ways. Providing a robust undo feature allows users to easily roll back mistakes, revert noisy enricher outputs, and safely explore different investigation paths without permanently damaging the graph state.

3. Data Model Proposal

We will adapt the schema.org/Action model and map it to a PostgreSQL table (e.g., action_history) to keep track of changes.

  • @type (Action Type): e.g., CreateAction, DeleteAction, UpdateAction, EnrichAction.
  • agent: Who performed the action (a User ID or an Enricher ID).
  • object: A JSON representation of the entity state before the action (useful for updates/deletes).
  • result: A JSON representation of the entity state after the action (useful for creations/updates).
  • target (EntryPoint): Specifies the mechanism to revert the action. It will store the repository name, function, and arguments required to execute the undo.
  • identifier: A unique ID or trace ID grouping multiple sub-actions (e.g., an enricher creating 10 nodes will share the same transaction identifier).

Example Postgres Schema representation:

{
  "@context": "https://schema.org",
  "@type": "CreateAction",
  "identifier": "txn-uuid-1234",
  "agent": {
    "@type": "SoftwareApplication", 
    "name": "IP Enricher"
    "url": "python://flowsint.enrichers.ip.to_infos:IpToInfosEnricher?version=0.1.0"
  },
  "object":  [
    {
      "@type": "WebSite", 
      "id": "node-uuid-2", 
      "url": "https://example.com"
    }
   ],
  "result": [
    {
      "@type": "FlowNode", 
      "id": "node-uuid-1", 
      "label": "IP", 
      "value": "8.8.8.8"
    },
    {
      "@type": "FlowNode", 
      "id": "node-uuid-2", 
      "label": "IP", 
      "value": "1.1.1.1"
    }
  ],
  "potentialAction":
  [
    {
      "@type": "EntryPoint",
      "url": "python://flowsint.command:UndoCreateCommand",
      "target": [
        {
          "identifier": "node-uuid-1"
        },
        {
          "identifier": "node-uuid-2"
        }
      ]
    }
  ]
}

4. Target Entities

The undo system will primarily target the following core components:

  1. src/flowsint_core/core/types.py:FlowNode: Undoing node creations, updates, and deletions.
  2. src/flowsint_core/core/types.py:FlowEdge: Undoing edge creations, updates, and deletions.
  3. src/flowsint_core/core/enricher_base.py:execute: A macro-action that encompasses multiple node and edge operations. Reverting an enricher execution means looking up all actions tied to the enricher's identifier (transaction ID) and undoing them in reverse order.

5. Architectural Design Patterns

To implement this agnostically, we can evaluate two core Object-Oriented Design Patterns.

The Command pattern encapsulates a request as an object, thereby letting you parameterize clients with different requests, queue or log requests, and support undoable operations.

In this architecture, every mutation to the graph (Create, Update, Delete) is encapsulated in a Command object that knows how to execute itself and how to undo itself.

Python Implementation Example

from abc import ABC, abstractmethod
from typing import Any, Dict, List

class Action(ABC):
    @abstractmethod
    def execute(self) -> None:
        raise NotImplementedError
    
    @abstractmethod
    def undo(self) -> None:
        raise NotImplementedError
    
    @abstractmethod
    def to_json(self) -> Dict[str, Any]:
        """Serializes the command into the schema.org/Action format for Postgres"""
        raise NotImplementedError

    @classmethod
    @abstractmethod
    def from_dict(cls, repo: Any, action_dict: Dict[str, Any]) -> 'Command':
        """Deserializes the Postgres action back into an executable Command"""
        raise NotImplementedError


class GraphRepoCreateAction(Command):
    def __init__(
        self, 
        repo: Any, 
        node_data: List[dict],
        edge_data: List[dict],
        agent: str, 
        txn_id: str,
    ):
        self.repo = repo
        self.node_data = node_data  # Expects a list (can be empty, 1, or many)
        self.edge_data = edge_data  # Expects a list (can be empty, 1, or many)
        self.agent = agent
        self.txn_id = txn_id
        self.node_id_list: List[str] = []
        self.edge_id_list: List[str] = []

    def execute(self) -> None:
        # Executes Neo4j operations for all nodes in the array
        # TODO: implement self.graph_service create logic
        
        # Persist the macro-action to Postgres
        self.action_service.create_action_history(self.dump_json())

    def undo(self) -> None:
        # Reverts Neo4j operations by iterating over the array of saved target identifiers
        for node_id in self.node_ids:
            self.repo.delete_node(node_id)
            
        # Mark action as reverted/deleted in Postgres
        self.repo.delete_action(self.txn_id)

    def dump_json(self) -> str:
        raise NotImplementedError

    @classmethod
    def from_dict(cls, repo: Any, action_dict: Dict[str, Any]) -> 'CreateNodeCommand':
        """
        Reconstructs the CreateNodeCommand from the Postgres JSON dictionary.
        Safely handles parsing the arrays.
        """
        if action_dict.get("@type") != "GraphRepoCreateAction":
            raise ValueError(f"Invalid Action type for GraphRepoCreateAction: {action_dict.get('@type')}")

        txn_id = action_dict.get("id")
        url = action_dict.get("url")
        agent = action_dict.get("agent")
        
        # Ensure result is treated as a list even if a legacy single-dict is pulled
        result_data = action_dict.get("result", [])
        if len(result_data) == 0:
            raise ValueError("CreateAction must have a non-empty 'result' field representing created nodes
        nodes_data = result_data if isinstance(result_data, list) else [result_data]
        
        # The node IDs created during execution are stored inside the EntryPoint target
        target = action_dict.get("target", [])
        if len(target) == 0:
            raise ValueError("CreateAction must have a non-empty 'target' field representing created nodes")
        # TODO: add node_data logic

        # TODO: add edge_data logic

        # Instantiate the command with the injected repository dependency
        command = cls(
            repo=repo, 
            node_data=node_data, 
            edge_data=node_data, 
            agent=agent, 
            txn_id=txn_id
          )
        
        # Restore the internal state that was generated during the original execution
        command.node_ids = node_ids
        
        return command

Usage Example for the "Undo" Resolver

When a user clicks "Undo" for a specific transaction (e.g., reverting a noisy enricher run), the orchestrator fetches the history from Postgres and reconstructs the objects:

ACTION_TYPE_CLS_MAP = {
  'GraphRepoCreateAction': GraphRepoCreateAction,
  'GraphRepoDeleteAction': GraphRepoDeleteAction,
  'GraphRepoUpdateAction': GraphRepoUpdateAction,
}
...
def undo_action(txn_id: str, graph_repo: Any, transaction_repo: Any):
    # 1. Fetch all actions tied to this transaction from Postgres (ordered by newest first)
    raw_actions = transaction_repo.get_actions_by_identifier(txn_id, order="DESC")
    action_set = {}
    # 2. Iterate and rebuild commands
    for action_dict in raw_actions:
        action_type = action_dict.get("@type")
        
        # Using a simple Factory or Match-Case to route to the correct Command class
        try:
          action_item = ACTION_TYPE_CLS_MAP[action_type].from_dict(graph_repo, action_dict)
          action_set.add(action_item)
        except KeyError as e :
          raise RuntimeError(f"ActionType {action_type} is not supported", e)
            
        # 3. Trigger the undo
        action = action_prefligt_check(action_set)
        action()

Pros:

  • Perfect fit for Undo/Redo: Inherently designed to support reversibility.
  • Agnostic Execution: The target.EntryPoint mapping is easily achieved since the command knows exactly which repository method to call for the undo.
  • Macro-Commands: Easy to group multiple CreateFlowNodeCommand and CreateFlowEdgeCommand into a MacroCommand to represent an Enricher.execute() run.

Cons:

  • Class Explosion: Requires creating a specific Command class for every type of mutation.

Option B: The Memento Pattern

The Memento pattern captures and externalizes an object's internal state without violating encapsulation, allowing the object to be restored to this state later.

Python Implementation Example

class FlowNodeMemento:
    def __init__(self, state: dict):
        self._state = state
        
    def get_state(self) -> dict:
        return self._state

class FlowNodeOriginator:
    def __init__(self, node_id: str, properties: dict):
        self.node_id = node_id
        self.properties = properties
        
    def save_to_memento(self) -> FlowNodeMemento:
        return FlowNodeMemento(self.properties.copy())
        
    def restore_from_memento(self, memento: FlowNodeMemento):
        self.properties = memento.get_state()
        # Call Neo4j repo to persist the restored state

class Caretaker:
    def __init__(self, db_client):
        self.db = db_client # Postgres client

    def save_state(self, action_id: str, memento: FlowNodeMemento):
        # Serialize memento and store in Postgres as schema.org/Action `object`
        self.db.insert_action(action_id, memento.get_state())

    def get_state(self, action_id: str) -> FlowNodeMemento:
        # Retrieve from Postgres
        state = self.db.get_action_object(action_id)
        return FlowNodeMemento(state)

Pros:

  • Great for Updates: Exceptionally good at handling complex state changes and reverting partial updates to a node.
  • Encapsulation: State logic is kept strictly inside the models.

Cons:

  • Doesn't map well to Creates/Deletes: Memento is designed for state restoration, not necessarily structural graph changes like adding or deleting nodes/edges.
  • Heavy Storage: Storing full object states for every minor change can quickly bloat the PostgreSQL database.

7. Preflight Check

To ensure that an undo operation is safe to execute and won't inadvertently corrupt the graph or overwrite newer changes, we should introduce a Preflight Check feature.

This feature will simulate or validate the undo operation before committing to it. For example, before reverting an update, the system should verify that no other user or enricher has modified the node in the meantime.

Here are the implementation steps for the Preflight Check feature:

Step 1: Define a Preflight Result Model

Create a standard response object that the preflight checks will return. This allows the system to differentiate between blocking errors (cannot undo) and warnings (can undo, but the user should be notified).

from typing import List
from dataclasses import dataclass, field

@dataclass
class PreflightResult:
    can_undo: bool = True
    warnings: List[str] = field(default_factory=list)
    errors: List[str] = field(default_factory=list)

    def merge(self, other: 'PreflightResult'):
        """Helper to combine results from a batch of commands"""
        self.can_undo = self.can_undo and other.can_undo
        self.warnings.extend(other.warnings)
        self.errors.extend(other.errors)

Step 2: Extend the Command Interface

Add a preflight_check() method to the abstract Command base class. Every command type must implement its own validation logic.

class Command(ABC):
    # ... existing execute, undo, to_schema_action, from_schema_action ...

    @abstractmethod
    def preflight_check(self) -> PreflightResult:
        """Validates if the undo operation can be safely executed."""
        pass

Step 3: Implement Check Logic per Command Type

Each specific command implements checks relevant to its operation.

Example 1: Undoing a Creation (Delete the node)
If we are undoing a node creation, we need to check if the node still exists and if it has new edges attached to it that weren't part of this transaction.

class CreateNodeCommand(Command):
    # ... existing init, execute, undo ...

    def preflight_check(self) -> PreflightResult:
        result = PreflightResult()
        
        for node_id in self.node_id_list:
            # 1. Check if node still exists
            current_node = self.repo.get_node(node_id)
            if not current_node:
                result.warnings.append(f"Node {node_id} has already been deleted.")
                continue
            
            # 2. Check for dependency conflicts (e.g., edges created by OTHER transactions)
            edges = self.repo.get_edges_for_node(node_id)
            if len(edges) > 0:
                result.warnings.append(f"Deleting Node {node_id} will also delete {len(edges)} connected edges.")
                # Depending on business logic, this could be an error instead of a warning
                
        return result

Example 2: Undoing an Update (Revert to previous state)
If we are undoing an update, we must check for "intervening edits." If the current state of the node in Neo4j doesn't match the state we recorded after our action, someone else modified it.

class UpdateNodeCommand(Command):
    # ... assuming self.previous_state and self.new_state exist ...
    
    def preflight_check(self) -> PreflightResult:
        result = PreflightResult()
        
        for node_id in self.node_ids:
            current_node_state = self.repo.get_node(node_id).to_dict()
            
            # Check if the node has been modified since this action occurred
            # (Requires comparing the current Neo4j state to the 'result' saved in Postgres)
            if current_node_state != self.new_state:
                result.errors.append(f"Node {node_id} has been modified by another action. Undoing will overwrite newer changes.")
                result.can_undo = False
                
        return result

Step 4: Integrate Preflight into the Undo Orchestrator

Update the resolver that handles user requests to undo a transaction. It should run the preflight checks before triggering any database mutations.

def revert_transaction_with_preflight(txn_id: str, repo: Any, postgres_db: Any, force: bool = False):
    raw_actions = postgres_db.get_actions_by_identifier(txn_id, order="DESC")
    commands_to_undo = []
    
    # 1. Build commands
    for action_dict in raw_actions:
        commands_to_undo.append(CommandFactory.build_from_action(repo, action_dict))
        
    # 2. Run Preflight Checks
    master_preflight = PreflightResult()
    for command in commands_to_undo:
        master_preflight.merge(command.preflight_check())
        
    # 3. Handle Preflight Results
    if not master_preflight.can_undo and not force:
        raise ValueError({"message": "Cannot undo transaction due to conflicts.", "errors": master_preflight.errors})
        
    if master_preflight.warnings and not force:
        # Return HTTP 409 Conflict or 428 Precondition Required to the frontend
        # so the UI can prompt the user: "Are you sure? Warnings: X, Y, Z"
        return {"status": "requires_confirmation", "warnings": master_preflight.warnings}
        
    # 4. Execute Undo (if safe or forced)
    for command in commands_to_undo:
        command.undo()
        
    return {"status": "success"}

Benefits of this Implementation:

  1. Safety: Prevents data corruption caused by race conditions or cascading deletes (e.g., deleting a node that is now the core of a newly built investigation path).
  2. UX Friendly: Exposes warnings to the frontend, allowing the UI to display confirmation dialogs ("This node has 3 new edges attached. Do you still want to revert?").
  3. Graceful Failures: Skips operations safely if an element is already in the desired state (e.g., trying to undo a CreateAction on a node that was already manually deleted by the user).

The Command Pattern is highly recommended for this implementation, potentially augmented with elements of Event Sourcing.

Implementation Steps:

  1. Command Invoker (Postgres Integration): Create an ActionHistoryService that acts as the Command Invoker. When an enricher calls execute(), all resulting node/edge creations are routed through this service. The service executes the command on Neo4j and immediately serializes the dump_json() to PostgreSQL.
  2. Enricher Transaction IDs: Modify Enricher.execute() to generate a unique identifier (UUID) per execution. All FlowNode and FlowEdge commands emitted during this execution will share this identifier.
  3. Undo Resolution: To undo an enricher run, query Postgres for all actions matching the identifier, sort them in reverse chronological order by timestamp, and execute the repository logic defined in the target (EntryPoint) field.
Originally created by @gustavorps on GitHub (Feb 27, 2026). Original GitHub issue: https://github.com/reconurge/flowsint/issues/127 # RFC: Agnostic Revert Feature for User and Enricher Actions ## 1. Overview This RFC proposes an architecture and implementation strategy for a generic and reusable "Undo" feature within Flowsint. The feature will allow the system to revert creations, deletions, and updates on graph elements (`FlowNode` and `FlowEdge`), whether these actions are initiated manually by a user or automatically by an enricher/pipeline/flow. The history of actions will be stored in PostgreSQL using a data model based on [[Schema.org/Action](https://schema.org/Action)](https://schema.org/Action), a proposal for consistent ontology and data models for future implementations, while the actual graph data remains in Neo4j. ## 2. Motivation As users investigate data and run enrichers, the graph can quickly become cluttered or altered in unintended ways. Providing a robust undo feature allows users to easily roll back mistakes, revert noisy enricher outputs, and safely explore different investigation paths without permanently damaging the graph state. ## 3. Data Model Proposal We will adapt the `schema.org/Action` model and map it to a PostgreSQL table (e.g., `action_history`) to keep track of changes. * **`@type` (Action Type)**: e.g., `CreateAction`, `DeleteAction`, `UpdateAction`, `EnrichAction`. * **`agent`**: Who performed the action (a User ID or an Enricher ID). * **`object`**: A JSON representation of the entity state *before* the action (useful for updates/deletes). * **`result`**: A JSON representation of the entity state *after* the action (useful for creations/updates). * **`target` (EntryPoint)**: Specifies the mechanism to revert the action. It will store the repository name, function, and arguments required to execute the undo. * **`identifier`**: A unique ID or trace ID grouping multiple sub-actions (e.g., an enricher creating 10 nodes will share the same transaction identifier). ### Example Postgres Schema representation: ```json { "@context": "https://schema.org", "@type": "CreateAction", "identifier": "txn-uuid-1234", "agent": { "@type": "SoftwareApplication", "name": "IP Enricher" "url": "python://flowsint.enrichers.ip.to_infos:IpToInfosEnricher?version=0.1.0" }, "object": [ { "@type": "WebSite", "id": "node-uuid-2", "url": "https://example.com" } ], "result": [ { "@type": "FlowNode", "id": "node-uuid-1", "label": "IP", "value": "8.8.8.8" }, { "@type": "FlowNode", "id": "node-uuid-2", "label": "IP", "value": "1.1.1.1" } ], "potentialAction": [ { "@type": "EntryPoint", "url": "python://flowsint.command:UndoCreateCommand", "target": [ { "identifier": "node-uuid-1" }, { "identifier": "node-uuid-2" } ] } ] } ``` ## 4. Target Entities The undo system will primarily target the following core components: 1. **`src/flowsint_core/core/types.py:FlowNode`**: Undoing node creations, updates, and deletions. 2. **`src/flowsint_core/core/types.py:FlowEdge`**: Undoing edge creations, updates, and deletions. 3. **`src/flowsint_core/core/enricher_base.py:execute`**: A macro-action that encompasses multiple node and edge operations. Reverting an enricher execution means looking up all actions tied to the enricher's `identifier` (transaction ID) and undoing them in reverse order. ## 5. Architectural Design Patterns To implement this agnostically, we can evaluate two core Object-Oriented Design Patterns. ### Option A: The Command Pattern (Highly Recommended) The Command pattern encapsulates a request as an object, thereby letting you parameterize clients with different requests, queue or log requests, and support undoable operations. In this architecture, every mutation to the graph (Create, Update, Delete) is encapsulated in a Command object that knows how to execute itself and how to undo itself. #### Python Implementation Example ```python from abc import ABC, abstractmethod from typing import Any, Dict, List class Action(ABC): @abstractmethod def execute(self) -> None: raise NotImplementedError @abstractmethod def undo(self) -> None: raise NotImplementedError @abstractmethod def to_json(self) -> Dict[str, Any]: """Serializes the command into the schema.org/Action format for Postgres""" raise NotImplementedError @classmethod @abstractmethod def from_dict(cls, repo: Any, action_dict: Dict[str, Any]) -> 'Command': """Deserializes the Postgres action back into an executable Command""" raise NotImplementedError class GraphRepoCreateAction(Command): def __init__( self, repo: Any, node_data: List[dict], edge_data: List[dict], agent: str, txn_id: str, ): self.repo = repo self.node_data = node_data # Expects a list (can be empty, 1, or many) self.edge_data = edge_data # Expects a list (can be empty, 1, or many) self.agent = agent self.txn_id = txn_id self.node_id_list: List[str] = [] self.edge_id_list: List[str] = [] def execute(self) -> None: # Executes Neo4j operations for all nodes in the array # TODO: implement self.graph_service create logic # Persist the macro-action to Postgres self.action_service.create_action_history(self.dump_json()) def undo(self) -> None: # Reverts Neo4j operations by iterating over the array of saved target identifiers for node_id in self.node_ids: self.repo.delete_node(node_id) # Mark action as reverted/deleted in Postgres self.repo.delete_action(self.txn_id) def dump_json(self) -> str: raise NotImplementedError @classmethod def from_dict(cls, repo: Any, action_dict: Dict[str, Any]) -> 'CreateNodeCommand': """ Reconstructs the CreateNodeCommand from the Postgres JSON dictionary. Safely handles parsing the arrays. """ if action_dict.get("@type") != "GraphRepoCreateAction": raise ValueError(f"Invalid Action type for GraphRepoCreateAction: {action_dict.get('@type')}") txn_id = action_dict.get("id") url = action_dict.get("url") agent = action_dict.get("agent") # Ensure result is treated as a list even if a legacy single-dict is pulled result_data = action_dict.get("result", []) if len(result_data) == 0: raise ValueError("CreateAction must have a non-empty 'result' field representing created nodes nodes_data = result_data if isinstance(result_data, list) else [result_data] # The node IDs created during execution are stored inside the EntryPoint target target = action_dict.get("target", []) if len(target) == 0: raise ValueError("CreateAction must have a non-empty 'target' field representing created nodes") # TODO: add node_data logic # TODO: add edge_data logic # Instantiate the command with the injected repository dependency command = cls( repo=repo, node_data=node_data, edge_data=node_data, agent=agent, txn_id=txn_id ) # Restore the internal state that was generated during the original execution command.node_ids = node_ids return command ``` ### Usage Example for the "Undo" Resolver When a user clicks "Undo" for a specific transaction (e.g., reverting a noisy enricher run), the orchestrator fetches the history from Postgres and reconstructs the objects: ```python ACTION_TYPE_CLS_MAP = { 'GraphRepoCreateAction': GraphRepoCreateAction, 'GraphRepoDeleteAction': GraphRepoDeleteAction, 'GraphRepoUpdateAction': GraphRepoUpdateAction, } ... def undo_action(txn_id: str, graph_repo: Any, transaction_repo: Any): # 1. Fetch all actions tied to this transaction from Postgres (ordered by newest first) raw_actions = transaction_repo.get_actions_by_identifier(txn_id, order="DESC") action_set = {} # 2. Iterate and rebuild commands for action_dict in raw_actions: action_type = action_dict.get("@type") # Using a simple Factory or Match-Case to route to the correct Command class try: action_item = ACTION_TYPE_CLS_MAP[action_type].from_dict(graph_repo, action_dict) action_set.add(action_item) except KeyError as e : raise RuntimeError(f"ActionType {action_type} is not supported", e) # 3. Trigger the undo action = action_prefligt_check(action_set) action() ``` **Pros:** * **Perfect fit for Undo/Redo**: Inherently designed to support reversibility. * **Agnostic Execution**: The `target.EntryPoint` mapping is easily achieved since the command knows exactly which repository method to call for the undo. * **Macro-Commands**: Easy to group multiple `CreateFlowNodeCommand` and `CreateFlowEdgeCommand` into a `MacroCommand` to represent an `Enricher.execute()` run. **Cons:** * **Class Explosion**: Requires creating a specific Command class for every type of mutation. ### Option B: The Memento Pattern The Memento pattern captures and externalizes an object's internal state without violating encapsulation, allowing the object to be restored to this state later. #### Python Implementation Example ```python class FlowNodeMemento: def __init__(self, state: dict): self._state = state def get_state(self) -> dict: return self._state class FlowNodeOriginator: def __init__(self, node_id: str, properties: dict): self.node_id = node_id self.properties = properties def save_to_memento(self) -> FlowNodeMemento: return FlowNodeMemento(self.properties.copy()) def restore_from_memento(self, memento: FlowNodeMemento): self.properties = memento.get_state() # Call Neo4j repo to persist the restored state class Caretaker: def __init__(self, db_client): self.db = db_client # Postgres client def save_state(self, action_id: str, memento: FlowNodeMemento): # Serialize memento and store in Postgres as schema.org/Action `object` self.db.insert_action(action_id, memento.get_state()) def get_state(self, action_id: str) -> FlowNodeMemento: # Retrieve from Postgres state = self.db.get_action_object(action_id) return FlowNodeMemento(state) ``` **Pros:** * **Great for Updates**: Exceptionally good at handling complex state changes and reverting partial updates to a node. * **Encapsulation**: State logic is kept strictly inside the models. **Cons:** * **Doesn't map well to Creates/Deletes**: Memento is designed for state restoration, not necessarily structural graph changes like adding or deleting nodes/edges. * **Heavy Storage**: Storing full object states for every minor change can quickly bloat the PostgreSQL database. ## 7. Preflight Check To ensure that an undo operation is safe to execute and won't inadvertently corrupt the graph or overwrite newer changes, we should introduce a **Preflight Check** feature. This feature will simulate or validate the undo operation before committing to it. For example, before reverting an update, the system should verify that no *other* user or enricher has modified the node in the meantime. Here are the implementation steps for the Preflight Check feature: ### Step 1: Define a Preflight Result Model Create a standard response object that the preflight checks will return. This allows the system to differentiate between blocking errors (cannot undo) and warnings (can undo, but the user should be notified). ```python from typing import List from dataclasses import dataclass, field @dataclass class PreflightResult: can_undo: bool = True warnings: List[str] = field(default_factory=list) errors: List[str] = field(default_factory=list) def merge(self, other: 'PreflightResult'): """Helper to combine results from a batch of commands""" self.can_undo = self.can_undo and other.can_undo self.warnings.extend(other.warnings) self.errors.extend(other.errors) ``` ### Step 2: Extend the Command Interface Add a `preflight_check()` method to the abstract `Command` base class. Every command type must implement its own validation logic. ```python class Command(ABC): # ... existing execute, undo, to_schema_action, from_schema_action ... @abstractmethod def preflight_check(self) -> PreflightResult: """Validates if the undo operation can be safely executed.""" pass ``` ### Step 3: Implement Check Logic per Command Type Each specific command implements checks relevant to its operation. **Example 1: Undoing a Creation (Delete the node)** If we are undoing a node creation, we need to check if the node still exists and if it has new edges attached to it that weren't part of this transaction. ```python class CreateNodeCommand(Command): # ... existing init, execute, undo ... def preflight_check(self) -> PreflightResult: result = PreflightResult() for node_id in self.node_id_list: # 1. Check if node still exists current_node = self.repo.get_node(node_id) if not current_node: result.warnings.append(f"Node {node_id} has already been deleted.") continue # 2. Check for dependency conflicts (e.g., edges created by OTHER transactions) edges = self.repo.get_edges_for_node(node_id) if len(edges) > 0: result.warnings.append(f"Deleting Node {node_id} will also delete {len(edges)} connected edges.") # Depending on business logic, this could be an error instead of a warning return result ``` **Example 2: Undoing an Update (Revert to previous state)** If we are undoing an update, we must check for "intervening edits." If the current state of the node in Neo4j doesn't match the state we recorded *after* our action, someone else modified it. ```python class UpdateNodeCommand(Command): # ... assuming self.previous_state and self.new_state exist ... def preflight_check(self) -> PreflightResult: result = PreflightResult() for node_id in self.node_ids: current_node_state = self.repo.get_node(node_id).to_dict() # Check if the node has been modified since this action occurred # (Requires comparing the current Neo4j state to the 'result' saved in Postgres) if current_node_state != self.new_state: result.errors.append(f"Node {node_id} has been modified by another action. Undoing will overwrite newer changes.") result.can_undo = False return result ``` ### Step 4: Integrate Preflight into the Undo Orchestrator Update the resolver that handles user requests to undo a transaction. It should run the preflight checks *before* triggering any database mutations. ```python def revert_transaction_with_preflight(txn_id: str, repo: Any, postgres_db: Any, force: bool = False): raw_actions = postgres_db.get_actions_by_identifier(txn_id, order="DESC") commands_to_undo = [] # 1. Build commands for action_dict in raw_actions: commands_to_undo.append(CommandFactory.build_from_action(repo, action_dict)) # 2. Run Preflight Checks master_preflight = PreflightResult() for command in commands_to_undo: master_preflight.merge(command.preflight_check()) # 3. Handle Preflight Results if not master_preflight.can_undo and not force: raise ValueError({"message": "Cannot undo transaction due to conflicts.", "errors": master_preflight.errors}) if master_preflight.warnings and not force: # Return HTTP 409 Conflict or 428 Precondition Required to the frontend # so the UI can prompt the user: "Are you sure? Warnings: X, Y, Z" return {"status": "requires_confirmation", "warnings": master_preflight.warnings} # 4. Execute Undo (if safe or forced) for command in commands_to_undo: command.undo() return {"status": "success"} ``` ### Benefits of this Implementation: 1. **Safety**: Prevents data corruption caused by race conditions or cascading deletes (e.g., deleting a node that is now the core of a newly built investigation path). 2. **UX Friendly**: Exposes warnings to the frontend, allowing the UI to display confirmation dialogs ("This node has 3 new edges attached. Do you still want to revert?"). 3. **Graceful Failures**: Skips operations safely if an element is already in the desired state (e.g., trying to undo a CreateAction on a node that was already manually deleted by the user). ## 7. Recommended Approach The **Command Pattern** is highly recommended for this implementation, potentially augmented with elements of Event Sourcing. ### Implementation Steps: 1. **Command Invoker (Postgres Integration)**: Create an `ActionHistoryService` that acts as the Command Invoker. When an enricher calls `execute()`, all resulting node/edge creations are routed through this service. The service executes the command on Neo4j and immediately serializes the `dump_json()` to PostgreSQL. 1. **Enricher Transaction IDs**: Modify `Enricher.execute()` to generate a unique `identifier` (UUID) per execution. All `FlowNode` and `FlowEdge` commands emitted during this execution will share this identifier. 1. **Undo Resolution**: To undo an enricher run, query Postgres for all actions matching the `identifier`, sort them in reverse chronological order by timestamp, and execute the repository logic defined in the `target` (EntryPoint) field.
Sign in to join this conversation.
1 Participants
Notifications
Due Date
No due date set.
Dependencies

No dependencies set.

Reference: github-starred/flowsint#1051