From b52cbbb904c8013b74308d58af88bc7dbb1b055c Mon Sep 17 00:00:00 2001 From: dextmorgn Date: Mon, 17 Nov 2025 23:05:09 +0100 Subject: [PATCH] feat(app): add key requirement for org_to_asn --- .../organization/to_asn.py | 137 ++++++++++-------- 1 file changed, 73 insertions(+), 64 deletions(-) diff --git a/flowsint-transforms/src/flowsint_transforms/organization/to_asn.py b/flowsint-transforms/src/flowsint_transforms/organization/to_asn.py index 6a36863..e68486f 100644 --- a/flowsint-transforms/src/flowsint_transforms/organization/to_asn.py +++ b/flowsint-transforms/src/flowsint_transforms/organization/to_asn.py @@ -1,10 +1,11 @@ -import json -import subprocess -from typing import List, Dict, Any, Union +import os +from typing import List, Dict, Any, Union, Optional from flowsint_core.core.transform_base import Transform +from flowsint_core.core.graph_db import Neo4jConnection from flowsint_types.organization import Organization from flowsint_types.asn import ASN from flowsint_core.core.logger import Logger +from tools.network.asnmap import AsnmapTool class OrgToAsnTransform(Transform): @@ -14,6 +15,39 @@ class OrgToAsnTransform(Transform): InputType = List[Organization] OutputType = List[ASN] + def __init__( + self, + sketch_id: Optional[str] = None, + scan_id: Optional[str] = None, + neo4j_conn: Optional[Neo4jConnection] = None, + vault=None, + params: Optional[Dict[str, Any]] = None, + ): + super().__init__( + sketch_id=sketch_id, + scan_id=scan_id, + neo4j_conn=neo4j_conn, + params_schema=self.get_params_schema(), + vault=vault, + params=params, + ) + + @classmethod + def required_params(cls) -> bool: + return True + + @classmethod + def get_params_schema(cls) -> List[Dict[str, Any]]: + """Declare required parameters for this transform""" + return [ + { + "name": "PDCP_API_KEY", + "type": "vaultSecret", + "description": "The ProjectDiscovery Cloud Platform API key for asnmap.", + "required": True, + }, + ] + @classmethod def name(cls) -> str: return "org_to_asn" @@ -42,74 +76,49 @@ class OrgToAsnTransform(Transform): async def scan(self, data: InputType) -> OutputType: """Find ASN information for organizations using asnmap.""" - asns: OutputType = [] + results: OutputType = [] + asnmap = AsnmapTool() + + # Retrieve API key from vault or environment + api_key = self.get_secret("PDCP_API_KEY", os.getenv("PDCP_API_KEY")) for org in data: - asn_data = self.__get_asn_from_asnmap(org.name) - if asn_data: - asns.append( - ASN( - number=int(asn_data["as_number"].lstrip("AS")), - name=asn_data["as_name"], - country=asn_data["as_country"], - cidrs=[], - ) - ) - else: - Logger.info( - self.sketch_id, {"message": f"No ASN found for org {org.name}."} - ) - return asns - - def __get_asn_from_asnmap(self, name: str) -> Dict[str, Any]: - try: - # Properly run the shell pipeline using shell=True - command = f"echo {name} | asnmap -silent -json | jq -s" - result = subprocess.run( - command, shell=True, capture_output=True, text=True, timeout=60 - ) - if not result.stdout.strip(): - return None try: - # Parse the JSON array - data_array = json.loads(result.stdout) - if not data_array: - return None - - combined_data = { - "as_range": [], - "as_name": None, - "as_country": None, - "as_number": None, - } - - for data in data_array: - if "as_range" in data: - combined_data["as_range"].extend(data["as_range"]) - if data.get("as_name") and not combined_data["as_name"]: - combined_data["as_name"] = data["as_name"] - if data.get("as_country") and not combined_data["as_country"]: - combined_data["as_country"] = data["as_country"] - if data.get("as_number") and not combined_data["as_number"]: - combined_data["as_number"] = data["as_number"] - - return combined_data if combined_data["as_number"] else None - - except json.JSONDecodeError as e: + # Use asnmap tool to get ASN info, passing the API key + asn_data = asnmap.launch(org.name, type="org", api_key=api_key) + if asn_data and "as_number" in asn_data: + # Parse ASN number from string like "AS16276" to integer 16276 + asn_string = asn_data["as_number"] + asn_number = int(asn_string.replace("AS", "").replace("as", "")) + # Create ASN object with correct field mapping + asn = ASN( + number=asn_number, + name=asn_data.get("as_name", ""), + country=asn_data.get("as_country", ""), + description=asn_data.get("as_name", ""), + ) + results.append(asn) + Logger.info( + self.sketch_id, + { + "message": f"[ASNMAP] Found AS{asn.number} ({asn.name}) for organization {org.name}" + }, + ) + else: + Logger.warn( + self.sketch_id, + { + "message": f"[ASNMAP] No ASN data or missing 'as_number' field for organization {org.name}. Data keys: {list(asn_data.keys()) if asn_data else 'None'}" + }, + ) + except Exception as e: Logger.error( self.sketch_id, - { - "message": f"An error occurred while parsing the JSON output from asnmap: {str(e)}" - }, + {"message": f"Error getting ASN for organization {org.name}: {e}"}, ) - return None + continue - except Exception as e: - Logger.error( - self.sketch_id, - {"message": f"An error occurred while running asnmap: {str(e)}"}, - ) - return None + return results def postprocess(self, results: OutputType, original_input: InputType) -> OutputType: # Create Neo4j relationships between organizations and their corresponding ASNs