feat(enrichers): to_website additional infos

This commit is contained in:
dextmorgn
2025-12-03 12:36:10 +01:00
parent 1d7e7789e6
commit 20357cd319

View File

@@ -1,5 +1,6 @@
from typing import List, Union
from typing import Dict, List, Union
import requests
from bs4 import BeautifulSoup
from flowsint_core.utils import is_valid_domain
from flowsint_core.core.enricher_base import Enricher
from flowsint_enrichers.registry import flowsint_enricher
@@ -28,18 +29,98 @@ class DomainToWebsiteEnricher(Enricher):
def key(cls) -> str:
return "domain"
def _extract_page_info(self, html_content: str) -> Dict[str, any]:
"""Extract title, description, content and technologies from HTML."""
soup = BeautifulSoup(html_content, 'html.parser')
# Extract title
title = None
title_tag = soup.find('title')
if title_tag:
title = title_tag.get_text().strip()
# Extract meta description
description = None
meta_desc = soup.find('meta', attrs={'name': 'description'})
if meta_desc and meta_desc.get('content'):
description = meta_desc.get('content').strip()
# Extract text content (remove scripts and styles)
for script in soup(['script', 'style']):
script.decompose()
content = soup.get_text(separator=' ', strip=True)
# Limit content to first 5000 characters
if len(content) > 5000:
content = content[:5000] + "..."
# Detect technologies
technologies = []
# Check for common frameworks and libraries
if soup.find('meta', attrs={'name': 'generator'}):
generator = soup.find('meta', attrs={'name': 'generator'}).get('content')
if generator:
technologies.append(generator)
# Check for React
if soup.find('div', id='root') or soup.find('div', id='react-root'):
technologies.append('React')
# Check for Vue.js
if soup.find(attrs={'data-v-'}):
technologies.append('Vue.js')
# Check for Angular
if soup.find(attrs={'ng-app'}) or soup.find(attrs={'ng-version'}):
technologies.append('Angular')
# Check for WordPress
if soup.find('meta', attrs={'name': 'generator', 'content': lambda x: x and 'WordPress' in x}):
technologies.append('WordPress')
return {
'title': title,
'description': description,
'content': content,
'technologies': technologies
}
async def scan(self, data: List[InputType]) -> List[OutputType]:
results: List[OutputType] = []
for domain in data:
try:
website_data = {
'url': f"https://{domain.domain}",
'domain': domain,
'active': False
}
# Try HTTPS first
try:
https_url = f"https://{domain.domain}"
response = requests.head(
response = requests.get(
https_url, timeout=10, allow_redirects=True
)
if response.status_code < 400:
results.append(Website(url=https_url, domain=domain, active=True))
website_data['url'] = https_url
website_data['active'] = True
website_data['status_code'] = response.status_code
# Extract relevant headers
website_data['headers'] = {
'content-type': response.headers.get('content-type'),
'server': response.headers.get('server'),
'x-powered-by': response.headers.get('x-powered-by')
}
# Remove None values
website_data['headers'] = {k: v for k, v in website_data['headers'].items() if v}
# Extract page information
page_info = self._extract_page_info(response.text)
website_data.update(page_info)
results.append(Website(**website_data))
continue
except requests.RequestException:
pass
@@ -47,15 +128,33 @@ class DomainToWebsiteEnricher(Enricher):
# Try HTTP if HTTPS fails
try:
http_url = f"http://{domain.domain}"
response = requests.head(http_url, timeout=10, allow_redirects=True)
response = requests.get(http_url, timeout=10, allow_redirects=True)
if response.status_code < 400:
results.append(Website(url=http_url, domain=domain, active=True))
website_data['url'] = http_url
website_data['active'] = True
website_data['status_code'] = response.status_code
# Extract relevant headers
website_data['headers'] = {
'content-type': response.headers.get('content-type'),
'server': response.headers.get('server'),
'x-powered-by': response.headers.get('x-powered-by')
}
# Remove None values
website_data['headers'] = {k: v for k, v in website_data['headers'].items() if v}
# Extract page information
page_info = self._extract_page_info(response.text)
website_data.update(page_info)
results.append(Website(**website_data))
continue
except requests.RequestException:
pass
# If both fail, still add HTTPS URL as default
results.append(Website(url=f"https://{domain.domain}", domain=domain, active=False))
results.append(Website(**website_data))
except Exception as e:
Logger.error(