feat(app): add key requirement for org_to_asn

This commit is contained in:
dextmorgn
2025-11-17 23:05:09 +01:00
parent 658d26fb9d
commit b52cbbb904

View File

@@ -1,10 +1,11 @@
import json
import subprocess
from typing import List, Dict, Any, Union
import os
from typing import List, Dict, Any, Union, Optional
from flowsint_core.core.transform_base import Transform
from flowsint_core.core.graph_db import Neo4jConnection
from flowsint_types.organization import Organization
from flowsint_types.asn import ASN
from flowsint_core.core.logger import Logger
from tools.network.asnmap import AsnmapTool
class OrgToAsnTransform(Transform):
@@ -14,6 +15,39 @@ class OrgToAsnTransform(Transform):
InputType = List[Organization]
OutputType = List[ASN]
def __init__(
self,
sketch_id: Optional[str] = None,
scan_id: Optional[str] = None,
neo4j_conn: Optional[Neo4jConnection] = None,
vault=None,
params: Optional[Dict[str, Any]] = None,
):
super().__init__(
sketch_id=sketch_id,
scan_id=scan_id,
neo4j_conn=neo4j_conn,
params_schema=self.get_params_schema(),
vault=vault,
params=params,
)
@classmethod
def required_params(cls) -> bool:
return True
@classmethod
def get_params_schema(cls) -> List[Dict[str, Any]]:
"""Declare required parameters for this transform"""
return [
{
"name": "PDCP_API_KEY",
"type": "vaultSecret",
"description": "The ProjectDiscovery Cloud Platform API key for asnmap.",
"required": True,
},
]
@classmethod
def name(cls) -> str:
return "org_to_asn"
@@ -42,74 +76,49 @@ class OrgToAsnTransform(Transform):
async def scan(self, data: InputType) -> OutputType:
"""Find ASN information for organizations using asnmap."""
asns: OutputType = []
results: OutputType = []
asnmap = AsnmapTool()
# Retrieve API key from vault or environment
api_key = self.get_secret("PDCP_API_KEY", os.getenv("PDCP_API_KEY"))
for org in data:
asn_data = self.__get_asn_from_asnmap(org.name)
if asn_data:
asns.append(
ASN(
number=int(asn_data["as_number"].lstrip("AS")),
name=asn_data["as_name"],
country=asn_data["as_country"],
cidrs=[],
)
)
else:
Logger.info(
self.sketch_id, {"message": f"No ASN found for org {org.name}."}
)
return asns
def __get_asn_from_asnmap(self, name: str) -> Dict[str, Any]:
try:
# Properly run the shell pipeline using shell=True
command = f"echo {name} | asnmap -silent -json | jq -s"
result = subprocess.run(
command, shell=True, capture_output=True, text=True, timeout=60
)
if not result.stdout.strip():
return None
try:
# Parse the JSON array
data_array = json.loads(result.stdout)
if not data_array:
return None
combined_data = {
"as_range": [],
"as_name": None,
"as_country": None,
"as_number": None,
}
for data in data_array:
if "as_range" in data:
combined_data["as_range"].extend(data["as_range"])
if data.get("as_name") and not combined_data["as_name"]:
combined_data["as_name"] = data["as_name"]
if data.get("as_country") and not combined_data["as_country"]:
combined_data["as_country"] = data["as_country"]
if data.get("as_number") and not combined_data["as_number"]:
combined_data["as_number"] = data["as_number"]
return combined_data if combined_data["as_number"] else None
except json.JSONDecodeError as e:
# Use asnmap tool to get ASN info, passing the API key
asn_data = asnmap.launch(org.name, type="org", api_key=api_key)
if asn_data and "as_number" in asn_data:
# Parse ASN number from string like "AS16276" to integer 16276
asn_string = asn_data["as_number"]
asn_number = int(asn_string.replace("AS", "").replace("as", ""))
# Create ASN object with correct field mapping
asn = ASN(
number=asn_number,
name=asn_data.get("as_name", ""),
country=asn_data.get("as_country", ""),
description=asn_data.get("as_name", ""),
)
results.append(asn)
Logger.info(
self.sketch_id,
{
"message": f"[ASNMAP] Found AS{asn.number} ({asn.name}) for organization {org.name}"
},
)
else:
Logger.warn(
self.sketch_id,
{
"message": f"[ASNMAP] No ASN data or missing 'as_number' field for organization {org.name}. Data keys: {list(asn_data.keys()) if asn_data else 'None'}"
},
)
except Exception as e:
Logger.error(
self.sketch_id,
{
"message": f"An error occurred while parsing the JSON output from asnmap: {str(e)}"
},
{"message": f"Error getting ASN for organization {org.name}: {e}"},
)
return None
continue
except Exception as e:
Logger.error(
self.sketch_id,
{"message": f"An error occurred while running asnmap: {str(e)}"},
)
return None
return results
def postprocess(self, results: OutputType, original_input: InputType) -> OutputType:
# Create Neo4j relationships between organizations and their corresponding ASNs