feat(transforms): remove some usage of preprocess

This commit is contained in:
dextmorgn
2025-11-19 12:12:15 +01:00
parent 62b0fc9fe0
commit dc87047027
5 changed files with 96 additions and 119 deletions

View File

@@ -505,18 +505,19 @@ class DomainToHistoryTransform(Transform):
# Process email addresses # Process email addresses
if individual.email_addresses: if individual.email_addresses:
for email in individual.email_addresses: for email_obj in individual.email_addresses:
if email and email not in processed_emails: email_str = email_obj.email
processed_emails.add(email) if email_str and email_str not in processed_emails:
processed_emails.add(email_str)
Logger.info( Logger.info(
self.sketch_id, self.sketch_id,
{"message": f"[WHOXY] Creating email node: {email}"}, {"message": f"[WHOXY] Creating email node: {email_str}"},
) )
self.create_node( self.create_node(
"email", "email",
"email", "email",
email, email_str,
caption=email, caption=email_str,
type="email", type="email",
) )
self.create_relationship( self.create_relationship(
@@ -525,24 +526,25 @@ class DomainToHistoryTransform(Transform):
individual.full_name, individual.full_name,
"email", "email",
"email", "email",
email, email_str,
"HAS_EMAIL", "HAS_EMAIL",
) )
# Process phone numbers # Process phone numbers
if individual.phone_numbers: if individual.phone_numbers:
for phone in individual.phone_numbers: for phone_obj in individual.phone_numbers:
if phone and phone not in processed_phones: phone_str = phone_obj.number
processed_phones.add(phone) if phone_str and phone_str not in processed_phones:
processed_phones.add(phone_str)
Logger.info( Logger.info(
self.sketch_id, self.sketch_id,
{"message": f"[WHOXY] Creating phone node: {phone}"}, {"message": f"[WHOXY] Creating phone node: {phone_str}"},
) )
self.create_node( self.create_node(
"phone", "phone",
"number", "number",
phone, phone_str,
caption=phone, caption=phone_str,
type="phone", type="phone",
) )
self.create_relationship( self.create_relationship(
@@ -551,7 +553,7 @@ class DomainToHistoryTransform(Transform):
individual.full_name, individual.full_name,
"phone", "phone",
"number", "number",
phone, phone_str,
"HAS_PHONE", "HAS_PHONE",
) )

View File

@@ -1,12 +1,10 @@
from typing import List, Union from typing import List
import whois import whois
from flowsint_core.utils import is_valid_domain
from flowsint_core.core.transform_base import Transform from flowsint_core.core.transform_base import Transform
from flowsint_types.domain import Domain, Domain from flowsint_types.domain import Domain
from flowsint_types.whois import Whois from flowsint_types.whois import Whois
from flowsint_types.email import Email from flowsint_types.email import Email
from flowsint_core.core.logger import Logger from flowsint_core.core.logger import Logger
from datetime import datetime
class WhoisTransform(Transform): class WhoisTransform(Transform):
@@ -28,22 +26,6 @@ class WhoisTransform(Transform):
def key(cls) -> str: def key(cls) -> str:
return "domain" return "domain"
def preprocess(self, data: Union[List[str], List[dict], InputType]) -> InputType:
cleaned: InputType = []
for item in data:
domain_obj = None
if isinstance(item, str):
if is_valid_domain(item):
domain_obj = Domain(domain=item)
elif isinstance(item, dict) and "domain" in item:
if is_valid_domain(item["domain"]):
domain_obj = Domain(domain=item["domain"])
elif isinstance(item, Domain):
domain_obj = item
if domain_obj:
cleaned.append(domain_obj)
return cleaned
async def scan(self, data: InputType) -> OutputType: async def scan(self, data: InputType) -> OutputType:
results: OutputType = [] results: OutputType = []
for domain in data: for domain in data:
@@ -85,8 +67,19 @@ class WhoisTransform(Transform):
else: else:
expiration_date_str = whois_info.expiration_date.isoformat() expiration_date_str = whois_info.expiration_date.isoformat()
# Extract registry domain ID
registry_domain_id = None
if (
hasattr(whois_info, "registry_domain_id")
and whois_info.registry_domain_id
):
registry_domain_id = str(whois_info.registry_domain_id)
elif hasattr(whois_info, "domain_id") and whois_info.domain_id:
registry_domain_id = str(whois_info.domain_id)
whois_obj = Whois( whois_obj = Whois(
domain=domain.domain, domain=domain,
registry_domain_id=registry_domain_id,
registrar=( registrar=(
str(whois_info.registrar) if whois_info.registrar else None str(whois_info.registrar) if whois_info.registrar else None
), ),
@@ -114,23 +107,27 @@ class WhoisTransform(Transform):
continue continue
# Create domain node # Create domain node
self.create_node("domain", "domain", whois_obj.domain, **whois_obj.__dict__) self.create_node(
"domain",
"domain",
whois_obj.domain.domain,
root=whois_obj.domain.root,
type="domain",
)
# Create whois node # Create whois node
whois_key = f"{whois_obj.domain}_{self.sketch_id}" whois_key = f"{whois_obj.domain.domain}_{self.sketch_id}"
whois_label = f"Whois-{whois_obj.domain}"
# Creating unique label if whois_obj.registry_domain_id:
date_format = "%Y-%m-%dT%H:%M:%S" whois_label = whois_obj.registry_domain_id
try: else:
year = datetime.strptime(whois_obj.creation_date, date_format).year whois_label = whois_obj.domain.domain
whois_label = f"{whois_label}-{year}"
except Exception:
continue
self.create_node( self.create_node(
"whois", "whois",
"whois_id", "whois_id",
whois_key, whois_key,
domain=whois_obj.domain, domain=whois_obj.domain.domain,
registry_domain_id=whois_obj.registry_domain_id,
registrar=whois_obj.registrar, registrar=whois_obj.registrar,
org=whois_obj.org, org=whois_obj.org,
city=whois_obj.city, city=whois_obj.city,
@@ -146,7 +143,7 @@ class WhoisTransform(Transform):
self.create_relationship( self.create_relationship(
"domain", "domain",
"domain", "domain",
whois_obj.domain, whois_obj.domain.domain,
"whois", "whois",
"whois_id", "whois_id",
whois_key, whois_key,
@@ -161,7 +158,7 @@ class WhoisTransform(Transform):
whois_obj.org, whois_obj.org,
country=whois_obj.country, country=whois_obj.country,
founding_date=whois_obj.creation_date, founding_date=whois_obj.creation_date,
description=f"Organization from WHOIS data for {whois_obj.domain}", description=f"Organization from WHOIS data for {whois_obj.domain.domain}",
caption=whois_obj.org, caption=whois_obj.org,
type="organization", type="organization",
) )
@@ -173,12 +170,12 @@ class WhoisTransform(Transform):
whois_obj.org, whois_obj.org,
"domain", "domain",
"domain", "domain",
whois_obj.domain, whois_obj.domain.domain,
"HAS_DOMAIN", "HAS_DOMAIN",
) )
self.log_graph_message( self.log_graph_message(
f"{whois_obj.domain} -> {whois_obj.org} (organization)" f"{whois_obj.domain.domain} -> {whois_obj.org} (organization)"
) )
if whois_obj.email: if whois_obj.email:
@@ -196,7 +193,7 @@ class WhoisTransform(Transform):
) )
self.log_graph_message( self.log_graph_message(
f"WHOIS for {whois_obj.domain} -> registrar: {whois_obj.registrar} org: {whois_obj.org} city: {whois_obj.city} country: {whois_obj.country} creation_date: {whois_obj.creation_date} expiration_date: {whois_obj.expiration_date}" f"WHOIS for {whois_obj.domain.domain} -> registry_id: {whois_obj.registry_domain_id} registrar: {whois_obj.registrar} org: {whois_obj.org} city: {whois_obj.city} country: {whois_obj.country} creation_date: {whois_obj.creation_date} expiration_date: {whois_obj.expiration_date}"
) )
return results return results

View File

@@ -1,6 +1,6 @@
import os import os
import re import re
from typing import Any, List, Union, Dict, Set, Optional from typing import Any, List, Dict, Set, Optional
from flowsint_core.core.transform_base import Transform from flowsint_core.core.transform_base import Transform
from flowsint_types.domain import Domain from flowsint_types.domain import Domain
from flowsint_types.individual import Individual from flowsint_types.individual import Individual
@@ -68,20 +68,6 @@ class EmailToDomainsTransform(Transform):
def key(cls) -> str: def key(cls) -> str:
return "email" return "email"
def preprocess(self, data: Union[List[str], List[dict], InputType]) -> InputType:
cleaned: InputType = []
for item in data:
email_obj = None
if isinstance(item, str):
email_obj = Email(email=item)
elif isinstance(item, dict) and "email" in item:
email_obj = Email(email=item["email"])
elif isinstance(item, Email):
email_obj = item
if email_obj:
cleaned.append(email_obj)
return cleaned
async def scan(self, data: InputType) -> OutputType: async def scan(self, data: InputType) -> OutputType:
"""Find domains related to emails using whoxy api.""" """Find domains related to emails using whoxy api."""
domains: OutputType = [] domains: OutputType = []
@@ -360,16 +346,17 @@ class EmailToDomainsTransform(Transform):
# Process email addresses # Process email addresses
if individual.email_addresses: if individual.email_addresses:
for email in individual.email_addresses: for email_obj in individual.email_addresses:
if email and email not in processed_emails: email_str = email_obj.email
processed_emails.add(email) if email_str and email_str not in processed_emails:
processed_emails.add(email_str)
# Create email node # Create email node
self.create_node( self.create_node(
"email", "email",
"email", "email",
email, email_str,
caption=email, caption=email_str,
type="email", type="email",
) )
@@ -380,22 +367,23 @@ class EmailToDomainsTransform(Transform):
individual.full_name, individual.full_name,
"email", "email",
"email", "email",
email, email_str,
"HAS_EMAIL", "HAS_EMAIL",
) )
# Process phone numbers # Process phone numbers
if individual.phone_numbers: if individual.phone_numbers:
for phone in individual.phone_numbers: for phone_obj in individual.phone_numbers:
if phone and phone not in processed_phones: phone_str = phone_obj.number
processed_phones.add(phone) if phone_str and phone_str not in processed_phones:
processed_phones.add(phone_str)
# Create phone node # Create phone node
self.create_node( self.create_node(
"phone", "phone",
"number", "number",
phone, phone_str,
caption=phone, caption=phone_str,
type="phone", type="phone",
) )
@@ -406,7 +394,7 @@ class EmailToDomainsTransform(Transform):
individual.full_name, individual.full_name,
"phone", "phone",
"number", "number",
phone, phone_str,
"HAS_PHONE", "HAS_PHONE",
) )

View File

@@ -375,16 +375,17 @@ class IndividualToDomainsTransform(Transform):
# Process email addresses # Process email addresses
if contact_individual.email_addresses: if contact_individual.email_addresses:
for email in contact_individual.email_addresses: for email_obj in contact_individual.email_addresses:
if email and email not in processed_emails: email_str = email_obj.email
processed_emails.add(email) if email_str and email_str not in processed_emails:
processed_emails.add(email_str)
# Create email node # Create email node
self.create_node( self.create_node(
"email", "email",
"email", "email",
email, email_str,
email=email, email=email_str,
) )
# Create relationship between individual and email # Create relationship between individual and email
@@ -394,22 +395,23 @@ class IndividualToDomainsTransform(Transform):
contact_individual.full_name, contact_individual.full_name,
"email", "email",
"email", "email",
email, email_str,
"HAS_EMAIL", "HAS_EMAIL",
) )
# Process phone numbers # Process phone numbers
if contact_individual.phone_numbers: if contact_individual.phone_numbers:
for phone in contact_individual.phone_numbers: for phone_obj in contact_individual.phone_numbers:
if phone and phone not in processed_phones: phone_str = phone_obj.number
processed_phones.add(phone) if phone_str and phone_str not in processed_phones:
processed_phones.add(phone_str)
# Create phone node # Create phone node
self.create_node( self.create_node(
"phone", "phone",
"number", "number",
phone, phone_str,
number=phone, number=phone_str,
) )
# Create relationship between individual and phone # Create relationship between individual and phone
@@ -419,7 +421,7 @@ class IndividualToDomainsTransform(Transform):
contact_individual.full_name, contact_individual.full_name,
"phone", "phone",
"number", "number",
phone, phone_str,
"HAS_PHONE", "HAS_PHONE",
) )

View File

@@ -1,6 +1,6 @@
import os import os
import re import re
from typing import Any, List, Union, Dict, Set, Optional from typing import Any, List, Dict, Set, Optional
from flowsint_core.core.transform_base import Transform from flowsint_core.core.transform_base import Transform
from flowsint_types.domain import Domain from flowsint_types.domain import Domain
from flowsint_types.organization import Organization from flowsint_types.organization import Organization
@@ -67,20 +67,6 @@ class OrgToDomainsTransform(Transform):
def key(cls) -> str: def key(cls) -> str:
return "name" return "name"
def preprocess(self, data: Union[List[str], List[dict], InputType]) -> InputType:
cleaned: InputType = []
for item in data:
org_obj = None
if isinstance(item, str):
org_obj = Organization(name=item)
elif isinstance(item, dict) and "name" in item:
org_obj = Organization(name=item["name"])
elif isinstance(item, Organization):
org_obj = item
if org_obj:
cleaned.append(org_obj)
return cleaned
async def scan(self, data: InputType) -> OutputType: async def scan(self, data: InputType) -> OutputType:
"""Find domains related to organizations using whoxy api.""" """Find domains related to organizations using whoxy api."""
domains: OutputType = [] domains: OutputType = []
@@ -526,18 +512,19 @@ class OrgToDomainsTransform(Transform):
# Process email addresses # Process email addresses
if individual.email_addresses: if individual.email_addresses:
for email in individual.email_addresses: for email_obj in individual.email_addresses:
if email and email not in processed_emails: email_str = email_obj.email
processed_emails.add(email) if email_str and email_str not in processed_emails:
processed_emails.add(email_str)
Logger.info( Logger.info(
self.sketch_id, self.sketch_id,
{"message": f"[WHOXY] Creating email node: {email}"}, {"message": f"[WHOXY] Creating email node: {email_str}"},
) )
self.create_node( self.create_node(
"email", "email",
"email", "email",
email, email_str,
caption=email, caption=email_str,
type="email", type="email",
) )
self.create_relationship( self.create_relationship(
@@ -546,24 +533,25 @@ class OrgToDomainsTransform(Transform):
individual.full_name, individual.full_name,
"email", "email",
"email", "email",
email, email_str,
"HAS_EMAIL", "HAS_EMAIL",
) )
# Process phone numbers # Process phone numbers
if individual.phone_numbers: if individual.phone_numbers:
for phone in individual.phone_numbers: for phone_obj in individual.phone_numbers:
if phone and phone not in processed_phones: phone_str = phone_obj.number
processed_phones.add(phone) if phone_str and phone_str not in processed_phones:
processed_phones.add(phone_str)
Logger.info( Logger.info(
self.sketch_id, self.sketch_id,
{"message": f"[WHOXY] Creating phone node: {phone}"}, {"message": f"[WHOXY] Creating phone node: {phone_str}"},
) )
self.create_node( self.create_node(
"phone", "phone",
"number", "number",
phone, phone_str,
caption=phone, caption=phone_str,
type="phone", type="phone",
) )
self.create_relationship( self.create_relationship(
@@ -572,7 +560,7 @@ class OrgToDomainsTransform(Transform):
individual.full_name, individual.full_name,
"phone", "phone",
"number", "number",
phone, phone_str,
"HAS_PHONE", "HAS_PHONE",
) )