feat: flowsint-transforms, flowsint-types, flowsint-core

This commit is contained in:
dextmorgn
2025-08-13 17:04:11 +02:00
parent c770b752ed
commit 5b62d9efba
200 changed files with 26326 additions and 1120 deletions

View File

@@ -0,0 +1,7 @@
# flowsint-api tests
Run the tests.
```bash
python -m pytest tests/ -v --tb=short
```

View File

View File

@@ -0,0 +1,9 @@
import pytest
from tests.logger import TestLogger
@pytest.fixture(autouse=True)
def mock_logger(monkeypatch):
"""Automatically replace the production Logger with TestLogger for all tests."""
monkeypatch.setattr("flowsint_core.core.logger.Logger", TestLogger)
# Mock the emit_event_task to do nothing
monkeypatch.setattr("flowsint_core.core.logger.emit_event_task.delay", lambda *args, **kwargs: None)

View File

@@ -0,0 +1,28 @@
import sys
import os
import asyncio
if __name__ == "__main__":
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from flowsint_types.domain import Domain
from flowsint_types.ip import Ip
from flowsint_transforms.domains.resolve import ResolveScanner
async def main():
# Create test data
domains = [Domain(domain="adaltas.com")]
ips = [Ip(address='12.23.34.45'), Ip(address='56.67.78.89')]
# Test the scanner
scanner = ResolveScanner("sketch_123", "scan_123")
# Test the new KISS postprocess method
scanner.postprocess(ips[:1], domains) # Only use first IP to match domains length
print("Postprocess test completed successfully!")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,57 @@
from typing import Literal, Union, Any
from uuid import UUID
EventLevel = Literal["info", "warn", "error", "success", "debug"]
LEVEL_MAP = {
"info": "INFO",
"warn": "WARN",
"error": "FAILED",
"success": "SUCCESS",
"debug": "DEBUG",
}
class TestLogger:
@staticmethod
def _format_message(type: str, message: str) -> str:
"""Format the log message with type prefix"""
return f"[{type.upper()}] {message}"
@staticmethod
def _create_log(sketch_id: Union[str, UUID], log_type: str, content: str) -> Any:
"""Create a dummy log object for testing"""
class DummyLog:
def __init__(self):
self.id = 'dummy_id'
return DummyLog()
@staticmethod
def info(sketch_id: Union[str, UUID], message: str):
"""Log an info message"""
formatted_message = TestLogger._format_message("INFO", message)
print(formatted_message)
@staticmethod
def error(sketch_id: Union[str, UUID], message: str):
"""Log an error message"""
formatted_message = TestLogger._format_message("FAILED", message)
print(formatted_message)
@staticmethod
def warn(sketch_id: Union[str, UUID], message: str):
"""Log a warning message"""
formatted_message = TestLogger._format_message("WARNING", message)
print(formatted_message)
@staticmethod
def debug(sketch_id: Union[str, UUID], message: str):
"""Log a debug message"""
formatted_message = TestLogger._format_message("DEBUG", message)
print(formatted_message)
@staticmethod
def success(sketch_id: Union[str, UUID], message: str):
"""Log a success message"""
formatted_message = TestLogger._format_message("SUCCESS", message)
print(formatted_message)

View File

@@ -0,0 +1,64 @@
from flowsint_transforms.crypto.wallet_to_nfts import CryptoWalletAddressToNFTs
from flowsint_types.wallet import CryptoWallet, CryptoNFT
from pydantic import HttpUrl
scanner = CryptoWalletAddressToNFTs("sketch_123", "scan_123")
def test_wallet_address_to_transactions_name():
assert scanner.name() == "wallet_to_nfts"
def test_wallet_address_to_transactions_category():
assert scanner.category() == "crypto"
def test_wallet_address_to_transactions_key():
assert scanner.key() == "address"
def test_preprocess_with_string():
input_data = ["0x742d35Cc6634C0532925a3b844Bc454e4438f44e"]
result = scanner.preprocess(input_data)
assert len(result) == 1
assert isinstance(result[0], CryptoWallet)
assert result[0].address == "0x742d35Cc6634C0532925a3b844Bc454e4438f44e"
def test_preprocess_with_dict():
input_data = [{"address": "0x742d35Cc6634C0532925a3b844Bc454e4438f44e"}]
result = scanner.preprocess(input_data)
assert len(result) == 1
assert isinstance(result[0], CryptoWallet)
assert result[0].address == "0x742d35Cc6634C0532925a3b844Bc454e4438f44e"
def test_preprocess_with_wallet_object():
wallet = CryptoWallet(address="0x742d35Cc6634C0532925a3b844Bc454e4438f44e")
input_data = [wallet]
result = scanner.preprocess(input_data)
assert len(result) == 1
assert isinstance(result[0], CryptoWallet)
assert result[0].address == "0x742d35Cc6634C0532925a3b844Bc454e4438f44e"
def test_scan_mocked_transactions(monkeypatch):
# Mock the _get_transactions method
def mock_get_nfts(address):
return [
CryptoNFT(
wallet=CryptoWallet(address="0x742d35Cc6634C0532925a3b844Bc454e4438f44e"),
contract_address="0x123",
token_id="1",
collection_name="Test Collection",
metadata_url="https://example.com/metadata.json",
image_url="https://example.com/image.png",
name="Test NFT"
)
]
monkeypatch.setattr(scanner, "_get_nfts", mock_get_nfts)
input_data = [CryptoWallet(address="0x742d35Cc6634C0532925a3b844Bc454e4438f44e")]
result = scanner.scan(input_data)
assert len(result) == 1
assert len(result[0]) == 1
assert result[0][0].contract_address == "0x123"
assert result[0][0].collection_name == "Test Collection"
assert result[0][0].metadata_url == HttpUrl("https://example.com/metadata.json")
assert result[0][0].image_url == HttpUrl("https://example.com/image.png")
assert result[0][0].name == "Test NFT"

View File

@@ -0,0 +1,84 @@
import pytest
from flowsint_transforms.crypto.wallet_to_transactions import CryptoWalletAddressToTransactions
from flowsint_types.wallet import CryptoWallet, CryptoWalletTransaction
scanner = CryptoWalletAddressToTransactions("sketch_123", "scan_123", params={"ETHERSCAN_API_KEY": "ta-clef-api"},)
def test_wallet_address_to_transactions_name():
assert scanner.name() == "wallet_to_transactions"
def test_wallet_address_to_transactions_category():
assert scanner.category() == "CryptoCryptoWallet"
def test_wallet_address_to_transactions_key():
assert scanner.key() == "address"
def test_preprocess_with_string():
input_data = ["0x742d35Cc6634C0532925a3b844Bc454e4438f44e"]
result = scanner.preprocess(input_data)
assert len(result) == 1
assert isinstance(result[0], CryptoWallet)
assert result[0].address == "0x742d35Cc6634C0532925a3b844Bc454e4438f44e"
def test_preprocess_with_dict():
input_data = [{"address": "0x742d35Cc6634C0532925a3b844Bc454e4438f44e"}]
result = scanner.preprocess(input_data)
assert len(result) == 1
assert isinstance(result[0], CryptoWallet)
assert result[0].address == "0x742d35Cc6634C0532925a3b844Bc454e4438f44e"
def test_preprocess_with_wallet_object():
wallet = CryptoWallet(address="0x742d35Cc6634C0532925a3b844Bc454e4438f44e")
input_data = [wallet]
result = scanner.preprocess(input_data)
assert len(result) == 1
assert isinstance(result[0], CryptoWallet)
assert result[0].address == "0x742d35Cc6634C0532925a3b844Bc454e4438f44e"
@pytest.mark.asyncio
async def test_scan_mocked_transactions(monkeypatch):
# Mock the _get_transactions method - note it takes address and api_key parameters
async def mock_get_transactions(address, api_key):
return [
CryptoWalletTransaction(
hash="0x123",
source=CryptoWallet(address="0x742d35Cc6634C0532925a3b844Bc454e4438f44e"),
target=CryptoWallet(address="0x456"),
value=1.0, # 1 ETH
timestamp="1234567890",
block_number="12345",
block_hash="0xabc",
nonce="1",
transaction_index="0",
gas="21000",
gas_price="20000000000",
gas_used="21000",
cumulative_gas_used="21000",
input="0x",
contract_address=None
)
]
monkeypatch.setattr(scanner, "_get_transactions", mock_get_transactions)
input_data = [CryptoWallet(address="0x742d35Cc6634C0532925a3b844Bc454e4438f44e")]
result = await scanner.scan(input_data)
assert len(result) == 1
assert len(result[0]) == 1
assert result[0][0].hash == "0x123"
assert result[0][0].source.address == "0x742d35Cc6634C0532925a3b844Bc454e4438f44e"
assert result[0][0].target.address == "0x456"
assert result[0][0].value == 1.0
assert result[0][0].timestamp == "1234567890"
def test_scanner_requires_api_key():
"""Test that the scanner validates required ETHERSCAN_API_KEY parameter at construction"""
with pytest.raises(ValueError, match="Scanner wallet_to_transactions received invalid params"):
CryptoWalletAddressToTransactions("sketch_123", "scan_123", params={})
def test_scanner_with_invalid_api_key_type():
"""Test that the scanner validates parameter types"""
with pytest.raises(ValueError, match="Scanner wallet_to_transactions received invalid params"):
CryptoWalletAddressToTransactions("sketch_123", "scan_123", params={"ETHERSCAN_API_KEY": 123})

View File

@@ -0,0 +1,147 @@
from flowsint_transforms.domains.resolve import ResolveScanner
from flowsint_types.domain import Domain
from flowsint_types.ip import Ip
from typing import List
import pytest
scanner = ResolveScanner("sketch_123", "scan_123")
def test_preprocess_valid_domains():
domains = [
Domain(domain="example.com"),
Domain(domain="example2.com"),
]
result = scanner.preprocess(domains)
result_domains = [d.domain for d in result]
expected_domains = [d.domain for d in domains]
assert result_domains == expected_domains
def test_unprocessed_valid_domains():
domains = [
"example.com",
"example2.com",
]
result = scanner.preprocess(domains)
result_domains = [d for d in result]
expected_domains = [Domain(domain=d) for d in domains]
assert result_domains == expected_domains
def test_preprocess_invalid_domains():
domains = [
Domain(domain="example.com"),
Domain(domain="invalid_domain"),
Domain(domain="example.org"),
]
result = scanner.preprocess(domains)
result_domains = [d.domain for d in result]
assert "example.com" in result_domains
assert "example.org" in result_domains
assert "invalid_domain" not in result_domains
def test_preprocess_multiple_formats():
domains = [
{"domain": "example.com"},
{"invalid_key": "example.io"},
Domain(domain="example.org"),
"example.org",
]
result = scanner.preprocess(domains)
result_domains = [d.domain for d in result]
assert "example.com" in result_domains
assert "example.org" in result_domains
assert "invalid_domain" not in result_domains
assert "example.io" not in result_domains
@pytest.mark.asyncio
async def test_scan_returns_ip(monkeypatch):
# on crée une fonction mock qui retourne une IP
def mock_gethostbyname(domain):
return "12.23.34.45"
monkeypatch.setattr("socket.gethostbyname", mock_gethostbyname)
input_data = [Domain(domain="example.com")]
output = await scanner.execute(input_data)
print(output)
assert isinstance(output, list)
assert output[0].address == "12.23.34.45"
def test_schemas():
input_schema = scanner.input_schema()
output_schema = scanner.output_schema()
# Test the structure and key properties rather than exact match
assert input_schema['type'] == 'Domain'
assert isinstance(input_schema['properties'], list)
input_property_names = [prop['name'] for prop in input_schema['properties']]
assert 'domain' in input_property_names
assert output_schema['type'] == 'Ip'
assert isinstance(output_schema['properties'], list)
output_property_names = [prop['name'] for prop in output_schema['properties']]
assert 'address' in output_property_names
class TestResolveInputOutputTypes:
"""Test the InputType/OutputType functionality for ResolveScanner"""
def test_input_output_types_are_defined(self):
"""Test that InputType and OutputType are properly defined"""
assert hasattr(ResolveScanner, 'InputType')
assert hasattr(ResolveScanner, 'OutputType')
assert ResolveScanner.InputType == List[Domain]
assert ResolveScanner.OutputType == List[Ip]
def test_schemas_use_generate_methods(self):
"""Test that schema methods use the new generate methods"""
# These should work without error
input_schema = ResolveScanner.generate_input_schema()
output_schema = ResolveScanner.generate_output_schema()
assert isinstance(input_schema, dict)
assert isinstance(output_schema, dict)
assert input_schema["type"] == "Domain"
assert output_schema["type"] == "Ip"
def test_schema_methods_return_same_as_generate_methods(self):
"""Test that input_schema() and output_schema() return the same as generate methods"""
assert ResolveScanner.input_schema() == ResolveScanner.generate_input_schema()
assert ResolveScanner.output_schema() == ResolveScanner.generate_output_schema()
def test_input_schema_properties(self):
"""Test input schema has expected properties"""
schema = ResolveScanner.input_schema()
properties = schema["properties"]
property_names = [p["name"] for p in properties]
# Domain should have these properties
assert "domain" in property_names
def test_output_schema_properties(self):
"""Test output schema has expected properties"""
schema = ResolveScanner.output_schema()
properties = schema["properties"]
property_names = [p["name"] for p in properties]
# Ip should have these properties
assert "address" in property_names
def test_type_accessibility_from_instance(self):
"""Test that types are accessible from scanner instance"""
scanner_instance = ResolveScanner("test", "test")
assert scanner_instance.InputType == List[Domain]
assert scanner_instance.OutputType == List[Ip]
# Should be able to generate schemas from instance
input_schema = scanner_instance.generate_input_schema()
output_schema = scanner_instance.generate_output_schema()
assert input_schema["type"] == "Domain"
assert output_schema["type"] == "Ip"

View File

@@ -0,0 +1,95 @@
from flowsint_transforms.domains.subdomains import SubdomainScanner
from flowsint_types.domain import Domain, Domain
scanner = SubdomainScanner("sketch_123", "scan_123")
def test_preprocess_valid_domains():
domains = [
Domain(domain="example.com"),
Domain(domain="example2.com"),
]
result = scanner.preprocess(domains)
result_domains = [d.domain for d in result]
expected_domains = [d.domain for d in domains]
assert result_domains == expected_domains
def test_unprocessed_valid_domains():
domains = [
"example.com",
"example2.com",
]
result = scanner.preprocess(domains)
result_domains = [d for d in result]
expected_domains = [Domain(domain=d) for d in domains]
assert result_domains == expected_domains
def test_preprocess_invalid_domains():
domains = [
Domain(domain="example.com"),
Domain(domain="invalid_domain"),
Domain(domain="example.org"),
]
result = scanner.preprocess(domains)
result_domains = [d.domain for d in result]
assert "example.com" in result_domains
assert "example.org" in result_domains
assert "invalid_domain" not in result_domains
def test_preprocess_multiple_formats():
domains = [
{"domain": "example.com"},
{"invalid_key": "example.io"},
Domain(domain="example.org"),
"example.org",
]
result = scanner.preprocess(domains)
result_domains = [d.domain for d in result]
assert "example.com" in result_domains
assert "example.org" in result_domains
assert "invalid_domain" not in result_domains
assert "example.io" not in result_domains
def test_scan_extracts_subdomains(monkeypatch):
mock_response = [
{"name_value": "mail.example.com\nwww.example.com"},
{"name_value": "api.example.com"},
{"name_value": "invalid_domain"}, # devrait être ignoré
]
class MockRequestsResponse:
def __init__(self, json_data):
self._json_data = json_data
self.status_code = 200
def json(self):
return self._json_data
@property
def ok(self):
return True
def mock_get(url, timeout):
assert "example.com" in url
return MockRequestsResponse(mock_response)
# Patch la requête réseau dans le module scanner
monkeypatch.setattr("requests.get", mock_get)
input_data = [Domain(domain="example.com")]
domains = scanner.execute(input_data)
assert isinstance(domains, list)
for sub in domains:
print(sub)
assert isinstance(sub, Domain)
expected = sorted([
"mail.example.com",
"www.example.com",
"api.example.com"
])
print(domains)
# assert domains[0].subdomains == expected

View File

@@ -0,0 +1,81 @@
from flowsint_transforms.domains.whois import WhoisScanner
from flowsint_types.domain import Domain
scanner = WhoisScanner("sketch_123", "scan_123")
def test_preprocess_valid_domains():
domains = [
Domain(domain="example.com"),
Domain(domain="example2.com"),
]
result = scanner.preprocess(domains)
result_domains = [d.domain for d in result]
expected_domains = [d.domain for d in domains]
assert result_domains == expected_domains
def test_unprocessed_valid_domains():
domains = [
"example.com",
"example2.com",
]
result = scanner.preprocess(domains)
result_domains = [d for d in result]
expected_domains = [Domain(domain=d) for d in domains]
assert result_domains == expected_domains
def test_preprocess_invalid_domains():
domains = [
Domain(domain="example.com"),
Domain(domain="invalid_domain"),
Domain(domain="example.org"),
]
result = scanner.preprocess(domains)
result_domains = [d.domain for d in result]
assert "example.com" in result_domains
assert "example.org" in result_domains
assert "invalid_domain" not in result_domains
def test_preprocess_multiple_formats():
domains = [
{"domain": "example.com"},
{"invalid_key": "example.io"},
Domain(domain="example.org"),
"example.org",
]
result = scanner.preprocess(domains)
result_domains = [d.domain for d in result]
assert "example.com" in result_domains
assert "example.org" in result_domains
assert "invalid_domain" not in result_domains
assert "example.io" not in result_domains
def test_scan_returns_whois_objects(monkeypatch):
# Patch `whois.whois` to avoid real network call
mock_whois = lambda domain: {
"registrar": "MockRegistrar",
"org": "MockOrg",
"city": "MockCity",
"country": "MockCountry",
"emails": ["admin@example.com"],
"creation_date": "2020-01-01",
"expiration_date": "2030-01-01"
}
monkeypatch.setattr("whois.whois", mock_whois)
input_data = [Domain(domain="example.com")]
output = scanner.execute(input_data)
assert isinstance(output, list)
assert isinstance(output[0], Domain)
assert output[0].whois.org == "MockOrg"
assert output[0].whois.email.email == "admin@example.com"
def test_schemas():
input_schema = scanner.input_schema()
output_schema = scanner.output_schema()
assert input_schema == {'type': 'Domain', 'properties': [{'name': 'domain', 'type': 'string'}, {'name': 'subdomains', 'type': 'array | null'}, {'name': 'ips', 'type': 'array | null'}, {'name': 'whois', 'type': 'Whois | null'}]}
assert output_schema == {'type': 'Domain', 'properties': [{'name': 'domain', 'type': 'string'}, {'name': 'subdomains', 'type': 'array | null'}, {'name': 'ips', 'type': 'array | null'}, {'name': 'whois', 'type': 'Whois | null'}]}

View File

@@ -0,0 +1,325 @@
import hashlib
from unittest.mock import Mock, patch
from flowsint_transforms.emails.to_gravatar import EmailToGravatarScanner
from flowsint_types.email import Email
from flowsint_types.gravatar import Gravatar
scanner = EmailToGravatarScanner("sketch_123", "scan_123")
class TestEmailToGravatarScanner:
"""Test suite for EmailToGravatarScanner"""
def test_name(self):
"""Test the scanner name"""
assert EmailToGravatarScanner.name() == "to_gravatar"
def test_category(self):
"""Test the scanner category"""
assert EmailToGravatarScanner.category() == "Email"
def test_key(self):
"""Test the scanner key"""
assert EmailToGravatarScanner.key() == "email"
def test_input_schema(self):
"""Test the input schema generation"""
schema = EmailToGravatarScanner.input_schema()
assert schema["type"] == "Email"
assert "properties" in schema
# Check that email property is present
email_prop = next((prop for prop in schema["properties"] if prop["name"] == "email"), None)
assert email_prop is not None
assert email_prop["type"] == "string"
def test_output_schema(self):
"""Test the output schema generation"""
schema = EmailToGravatarScanner.output_schema()
assert schema["type"] == "Gravatar"
assert "properties" in schema
# Check that required properties are present
src_prop = next((prop for prop in schema["properties"] if prop["name"] == "src"), None)
hash_prop = next((prop for prop in schema["properties"] if prop["name"] == "hash"), None)
assert src_prop is not None
assert hash_prop is not None
def test_preprocess_string_emails(self):
"""Test preprocessing with string emails"""
emails = [
"test@example.com",
"user@gmail.com",
]
result = scanner.preprocess(emails)
assert len(result) == 2
assert all(isinstance(email, Email) for email in result)
assert result[0].email == "test@example.com"
assert result[1].email == "user@gmail.com"
def test_preprocess_dict_emails(self):
"""Test preprocessing with dictionary emails"""
emails = [
{"email": "test@example.com"},
{"email": "user@gmail.com"},
]
result = scanner.preprocess(emails)
assert len(result) == 2
assert all(isinstance(email, Email) for email in result)
assert result[0].email == "test@example.com"
assert result[1].email == "user@gmail.com"
def test_preprocess_email_objects(self):
"""Test preprocessing with Email objects"""
emails = [
Email(email="test@example.com"),
Email(email="user@gmail.com"),
]
result = scanner.preprocess(emails)
assert len(result) == 2
assert all(isinstance(email, Email) for email in result)
assert result[0].email == "test@example.com"
assert result[1].email == "user@gmail.com"
def test_preprocess_mixed_formats(self):
"""Test preprocessing with mixed input formats"""
emails = [
"test@example.com",
{"email": "user@gmail.com"},
Email(email="admin@company.com"),
]
result = scanner.preprocess(emails)
assert len(result) == 3
assert all(isinstance(email, Email) for email in result)
assert result[0].email == "test@example.com"
assert result[1].email == "user@gmail.com"
assert result[2].email == "admin@company.com"
def test_preprocess_invalid_inputs(self):
"""Test preprocessing with invalid inputs"""
emails = [
"not-an-email",
{"invalid_key": "test@example.com"},
{"email": "invalid-email"},
None,
123,
]
result = scanner.preprocess(emails)
# The preprocess method doesn't validate email format, it just creates Email objects
# for valid string inputs and dicts with email key
assert len(result) == 2 # "not-an-email" and "invalid-email" are processed
assert result[0].email == "not-an-email"
assert result[1].email == "invalid-email"
def test_preprocess_empty_list(self):
"""Test preprocessing with empty list"""
result = scanner.preprocess([])
assert result == []
@patch('requests.get')
def test_scan_successful_gravatar(self, mock_get):
"""Test successful gravatar retrieval"""
# Mock successful response
mock_response = Mock()
mock_response.status_code = 200
mock_get.return_value = mock_response
emails = [Email(email="test@example.com")]
result = scanner.scan(emails)
assert len(result) == 1
assert isinstance(result[0], Gravatar)
assert result[0].hash == hashlib.md5("test@example.com".encode()).hexdigest()
assert "gravatar.com/avatar/" in str(result[0].src)
@patch('requests.get')
def test_scan_failed_request(self, mock_get):
"""Test handling of failed HTTP requests"""
# Mock failed response
mock_response = Mock()
mock_response.status_code = 404
mock_get.return_value = mock_response
emails = [Email(email="test@example.com")]
result = scanner.scan(emails)
assert len(result) == 0
@patch('requests.get')
def test_scan_request_exception(self, mock_get):
"""Test handling of request exceptions"""
# Mock exception
mock_get.side_effect = Exception("Network error")
emails = [Email(email="test@example.com")]
result = scanner.scan(emails)
assert len(result) == 0
@patch('requests.get')
def test_scan_multiple_emails(self, mock_get):
"""Test scanning multiple emails"""
# Mock successful responses
mock_response = Mock()
mock_response.status_code = 200
mock_get.return_value = mock_response
emails = [
Email(email="test1@example.com"),
Email(email="test2@example.com"),
Email(email="test3@example.com"),
]
result = scanner.scan(emails)
assert len(result) == 3
assert all(isinstance(gravatar, Gravatar) for gravatar in result)
assert mock_get.call_count == 3
@patch('requests.get')
def test_scan_mixed_success_failure(self, mock_get):
"""Test scanning with mixed success and failure"""
# Mock mixed responses - check the actual URL being called
def side_effect(url, *args, **kwargs):
mock_response = Mock()
# Check if the URL contains the hash for test1@example.com
test1_hash = hashlib.md5("test1@example.com".encode()).hexdigest()
if test1_hash in url:
mock_response.status_code = 200
else:
mock_response.status_code = 404
return mock_response
mock_get.side_effect = side_effect
emails = [
Email(email="test1@example.com"),
Email(email="test2@example.com"),
]
result = scanner.scan(emails)
# Should get 1 result for the first email (success) and 0 for the second (failure)
assert len(result) == 1
assert result[0].hash == hashlib.md5("test1@example.com".encode()).hexdigest()
def test_postprocess_with_neo4j_connection(self):
"""Test postprocessing with Neo4j connection"""
# Mock Neo4j connection
mock_neo4j = Mock()
scanner_with_neo4j = EmailToGravatarScanner("sketch_123", "scan_123", neo4j_conn=mock_neo4j)
gravatars = [
Gravatar(src="https://www.gravatar.com/avatar/hash1", hash="hash1"),
Gravatar(src="https://www.gravatar.com/avatar/hash2", hash="hash2"),
]
original_input = [
Email(email="test1@example.com"),
Email(email="test2@example.com"),
]
result = scanner_with_neo4j.postprocess(gravatars, original_input)
# Verify Neo4j queries were executed
assert mock_neo4j.query.call_count == 2
# Check that results are returned unchanged
assert result == gravatars
def test_postprocess_without_neo4j_connection(self):
"""Test postprocessing without Neo4j connection"""
gravatars = [
Gravatar(src="https://www.gravatar.com/avatar/hash1", hash="hash1"),
]
original_input = [Email(email="test@example.com")]
result = scanner.postprocess(gravatars, original_input)
# Should return results unchanged
assert result == gravatars
def test_postprocess_missing_original_input(self):
"""Test postprocessing with missing original input"""
gravatars = [
Gravatar(src="https://www.gravatar.com/avatar/hash1", hash="hash1"),
]
original_input = [] # Empty list
result = scanner.postprocess(gravatars, original_input)
# Should handle gracefully and return results
assert result == gravatars
def test_postprocess_none_original_input(self):
"""Test postprocessing with None original input"""
gravatars = [
Gravatar(src="https://www.gravatar.com/avatar/hash1", hash="hash1"),
]
# The postprocess method doesn't handle None input properly
# Let's test with an empty list instead
result = scanner.postprocess(gravatars, [])
# Should handle gracefully and return results
assert result == gravatars
def test_execute_full_workflow(self):
"""Test the complete execute workflow"""
with patch('requests.get') as mock_get:
# Mock successful response
mock_response = Mock()
mock_response.status_code = 200
mock_get.return_value = mock_response
emails = ["test@example.com"]
result = scanner.execute(emails)
assert len(result) == 1
assert isinstance(result[0], Gravatar)
assert result[0].hash == hashlib.md5("test@example.com".encode()).hexdigest()
def test_execute_with_invalid_input(self):
"""Test execute with invalid input"""
emails = ["not-an-email", "also-invalid"]
with patch('requests.get') as mock_get:
# Mock successful response for any request
mock_response = Mock()
mock_response.status_code = 200
mock_get.return_value = mock_response
result = scanner.execute(emails)
# The scanner processes any string as an email, so it will create Email objects
# and attempt to get gravatars for them
assert len(result) == 2
assert all(isinstance(gravatar, Gravatar) for gravatar in result)
def test_gravatar_hash_calculation(self):
"""Test that gravatar hash is calculated correctly"""
email = "test@example.com"
expected_hash = hashlib.md5(email.encode()).hexdigest()
with patch('requests.get') as mock_get:
mock_response = Mock()
mock_response.status_code = 200
mock_get.return_value = mock_response
emails = [Email(email=email)]
result = scanner.scan(emails)
assert len(result) == 1
assert result[0].hash == expected_hash
def test_gravatar_url_format(self):
"""Test that gravatar URL is formatted correctly"""
email = "test@example.com"
expected_hash = hashlib.md5(email.encode()).hexdigest()
expected_url = f"https://www.gravatar.com/avatar/{expected_hash}"
with patch('requests.get') as mock_get:
mock_response = Mock()
mock_response.status_code = 200
mock_get.return_value = mock_response
emails = [Email(email=email)]
result = scanner.scan(emails)
assert len(result) == 1
assert str(result[0].src) == expected_url

View File

@@ -0,0 +1,162 @@
import pytest
from unittest.mock import patch, MagicMock
from flowsint_transforms.emails.to_leaks import EmailToBreachesScanner
from flowsint_types.email import Email
from flowsint_types.breach import Breach
scanner = EmailToBreachesScanner("sketch_123", "scan_123")
def test_scanner_name():
assert EmailToBreachesScanner.name() == "to_leaks"
def test_scanner_category():
assert EmailToBreachesScanner.category() == "Email"
def test_scanner_key():
assert EmailToBreachesScanner.key() == "email"
def test_preprocess_string_emails():
emails = [
"test@example.com",
"user@domain.org",
]
result = scanner.preprocess(emails)
expected_emails = [Email(email=email) for email in emails]
assert result == expected_emails
def test_preprocess_dict_emails():
emails = [
{"email": "test@example.com"},
{"email": "user@domain.org"},
]
result = scanner.preprocess(emails)
expected_emails = [Email(email=email["email"]) for email in emails]
assert result == expected_emails
def test_preprocess_email_objects():
emails = [
Email(email="test@example.com"),
Email(email="user@domain.org"),
]
result = scanner.preprocess(emails)
assert result == emails
def test_preprocess_mixed_formats():
emails = [
"test@example.com",
{"email": "user@domain.org"},
Email(email="admin@company.com"),
{"invalid_key": "should_be_ignored@test.com"},
]
result = scanner.preprocess(emails)
result_emails = [email.email for email in result]
assert "test@example.com" in result_emails
assert "user@domain.org" in result_emails
assert "admin@company.com" in result_emails
assert "should_be_ignored@test.com" not in result_emails
@patch('src.transforms.emails.to_leaks.requests.get')
def test_scan_successful_response(mock_get):
# Mock successful API response
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = [
{"Name": "TestBreach", "Title": "Test Breach", "Domain": "test.com"},
{"Name": "AnotherBreach", "Title": "Another Breach", "Domain": "another.com"}
]
mock_get.return_value = mock_response
emails = [Email(email="test@example.com")]
result = scanner.scan(emails)
assert len(result) == 2
assert isinstance(result[0], Breach)
assert isinstance(result[1], Breach)
assert result[0].name == "testbreach"
assert result[1].name == "anotherbreach"
assert result[0].breach["name"] == "testbreach"
assert result[1].breach["name"] == "anotherbreach"
@patch('src.transforms.emails.to_leaks.requests.get')
def test_scan_no_breaches_found(mock_get):
# Mock 404 response (no breaches found)
mock_response = MagicMock()
mock_response.status_code = 404
mock_get.return_value = mock_response
emails = [Email(email="test@example.com")]
result = scanner.scan(emails)
assert len(result) == 0
@patch('src.transforms.emails.to_leaks.requests.get')
def test_scan_api_error(mock_get):
# Mock API error
mock_get.side_effect = Exception("API Error")
emails = [Email(email="test@example.com")]
result = scanner.scan(emails)
assert len(result) == 0
@patch('src.transforms.emails.to_leaks.requests.get')
def test_scan_missing_name_field(mock_get):
# Mock API response with missing "Name" field
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = [
{"Title": "Test Breach", "Domain": "test.com"}, # Missing "Name" field
{"Name": "ValidBreach", "Title": "Valid Breach", "Domain": "valid.com"}
]
mock_get.return_value = mock_response
emails = [Email(email="test@example.com")]
result = scanner.scan(emails)
assert len(result) == 2
assert result[0].name == "unknown" # Should default to "unknown"
assert result[1].name == "validbreach" # Should use the provided name
assert result[0].breach["title"] == "Test Breach"
assert result[1].breach["name"] == "validbreach"
@patch('src.transforms.emails.to_leaks.HIBP_API_KEY', None)
def test_scan_no_api_key():
"""Test that scanner raises ValueError when HIBP_API_KEY is not set."""
emails = [Email(email="test@example.com")]
with pytest.raises(ValueError, match="HIBP_API_KEY not set"):
scanner.scan(emails)
def test_postprocess():
# Test postprocess method with mocked neo4j connection
scanner.neo4j_conn = MagicMock()
# Create breach objects with the new structure
breach1 = Breach(
name="testbreach",
title="Test Breach",
domain="test.com",
pwncount=1000,
breach={"name": "testbreach", "title": "Test Breach"}
)
breach2 = Breach(
name="anotherbreach",
title="Another Breach",
domain="another.com",
pwncount=2000,
breach={"name": "anotherbreach", "title": "Another Breach"}
)
breaches = [breach1, breach2]
original_input = [Email(email="test@example.com")]
result = scanner.postprocess(breaches, original_input)
assert result == breaches
# Verify that neo4j queries were called:
# - 2 breach node creation queries
# - 1 email node creation query
# - 2 relationship creation queries
# Total: 5 queries
assert scanner.neo4j_conn.query.call_count == 5

View File

@@ -0,0 +1,69 @@
from flowsint_transforms.ips.asn_to_cidrs import AsnToCidrsScanner
from flowsint_types.asn import ASN
scanner = AsnToCidrsScanner("sketch_123", "scan_123")
def test_preprocess_valid_asns():
asns = [
ASN(number=15169),
ASN(number=13335),
]
result = scanner.preprocess(asns)
result_numbers = [asn.number for asn in result]
expected_numbers = [asn.number for asn in asns]
assert result_numbers == expected_numbers
def test_unprocessed_valid_asns():
asns = [
"15169",
"13335",
]
result = scanner.preprocess(asns)
result_asns = [asn for asn in result]
expected_asns = [ASN(number=int(asn)) for asn in asns]
assert result_asns == expected_asns
def test_preprocess_invalid_asns():
asns = [
ASN(number=15169),
ASN(number=999999999999), # Invalid ASN number
ASN(number=13335),
]
result = scanner.preprocess(asns)
result_numbers = [asn.number for asn in result]
assert 15169 in result_numbers
assert 13335 in result_numbers
assert 999999999999 not in result_numbers
def test_preprocess_multiple_formats():
asns = [
{"number": 15169},
{"invalid_key": 13335},
ASN(number=13335),
"15169",
]
result = scanner.preprocess(asns)
result_numbers = [asn.number for asn in result]
assert 15169 in result_numbers
assert 13335 in result_numbers
assert "invalid_key" not in result_numbers # Should be filtered out due to invalid key
def test_schemas():
input_schema = scanner.input_schema()
output_schema = scanner.output_schema()
# Input schema should have number field
assert "properties" in input_schema
number_prop = next((prop for prop in input_schema["properties"] if prop["name"] == "number"), None)
assert number_prop is not None
assert number_prop["type"] == "integer"
# Output schema should have network field
assert "properties" in output_schema
prop_names = [prop["name"] for prop in output_schema["properties"]]
assert "network" in prop_names

View File

@@ -0,0 +1,120 @@
from flowsint_transforms.ips.cidr_to_ips import CidrToIpsScanner
from flowsint_types.cidr import CIDR
from flowsint_types.ip import Ip
from tests.logger import TestLogger
logger = TestLogger()
scanner = CidrToIpsScanner("sketch_123", "scan_123", logger)
def test_preprocess_valid_cidrs():
cidrs = [
CIDR(network="8.8.8.0/24"),
CIDR(network="1.1.1.0/24"),
]
result = scanner.preprocess(cidrs)
result_networks = [cidr.network for cidr in result]
expected_networks = [cidr.network for cidr in cidrs]
assert result_networks == expected_networks
def test_preprocess_unprocessed_valid_cidrs():
cidrs = [
"8.8.8.0/24",
"1.1.1.0/24",
]
result = scanner.preprocess(cidrs)
result_cidrs = [c for c in result]
expected_cidrs = [CIDR(network=c) for c in cidrs]
assert result_cidrs == expected_cidrs
def test_preprocess_invalid_cidrs():
cidrs = [
CIDR(network="8.8.8.0/24"),
"invalid-cidr",
"not-a-cidr",
]
result = scanner.preprocess(cidrs)
result_networks = [str(cidr.network) for cidr in result]
assert "8.8.8.0/24" in result_networks
assert "invalid-cidr" not in result_networks
assert "not-a-cidr" not in result_networks
def test_preprocess_multiple_formats():
cidrs = [
{"network": "8.8.8.0/24"},
{"invalid_key": "1.1.1.0/24"},
CIDR(network="9.9.9.0/24"),
"InvalidCIDR",
]
result = scanner.preprocess(cidrs)
result_networks = [str(cidr.network) for cidr in result]
assert "8.8.8.0/24" in result_networks
assert "9.9.9.0/24" in result_networks
assert "1.1.1.0/24" not in result_networks
assert "InvalidCIDR" not in result_networks
def test_scan_extracts_ips(monkeypatch):
mock_dnsx_output = """8.35.200.12
8.35.200.112
8.35.200.16
8.35.200.170"""
class MockSubprocessResult:
def __init__(self, stdout):
self.stdout = stdout
self.returncode = 0
def mock_subprocess_run(cmd, shell, capture_output, text, timeout):
assert "dnsx" in cmd
assert "-ptr" in cmd
return MockSubprocessResult(mock_dnsx_output)
# Patch the subprocess call in the scanner
monkeypatch.setattr("subprocess.run", mock_subprocess_run)
input_data = [CIDR(network="8.35.200.0/24")]
ips = scanner.scan(input_data)
assert isinstance(ips, list)
assert len(ips) == 4
expected_ips = [
"8.35.200.12",
"8.35.200.112",
"8.35.200.16",
"8.35.200.170"
]
for ip in ips:
assert isinstance(ip, Ip)
assert ip.address in expected_ips
def test_scan_handles_empty_output(monkeypatch):
class MockSubprocessResult:
def __init__(self):
self.stdout = ""
self.returncode = 0
def mock_subprocess_run(cmd, shell, capture_output, text, timeout):
return MockSubprocessResult()
monkeypatch.setattr("subprocess.run", mock_subprocess_run)
input_data = [CIDR(network="8.8.8.0/24")]
ips = scanner.scan(input_data)
assert isinstance(ips, list)
assert len(ips) == 0
def test_scan_handles_subprocess_exception(monkeypatch):
def mock_subprocess_run(cmd, shell, capture_output, text, timeout):
raise Exception("Subprocess failed")
monkeypatch.setattr("subprocess.run", mock_subprocess_run)
input_data = [CIDR(network="8.8.8.0/24")]
ips = scanner.scan(input_data)
assert isinstance(ips, list)
assert len(ips) == 0

View File

@@ -0,0 +1,78 @@
from flowsint_transforms.ips.geolocation import GeolocationScanner
from flowsint_types.ip import Ip, Ip
scanner = GeolocationScanner("sketch_123", "scan_123")
def test_preprocess_valid_ips():
ips = [
Ip(address="8.8.8.8"),
Ip(address="1.1.1.1"),
]
result = scanner.preprocess(ips)
result_ips = [d.address for d in result]
expected_ips = [d.address for d in ips]
assert result_ips == expected_ips
def test_preprocess_string_ips():
ips = [
"8.8.8.8",
"1.1.1.1",
]
result = scanner.preprocess(ips)
result_ips = [d.address for d in result]
expected_ips = [d for d in ips]
assert [ip.address for ip in result] == expected_ips
def test_preprocess_invalid_ips():
ips = [
Ip(address="8.8.8.8"),
Ip(address="invalid_ip"),
Ip(address="1.1.1.1"),
]
result = scanner.preprocess(ips)
result_ips = [d.address for d in result]
assert "8.8.8.8" in result_ips
assert "1.1.1.1" in result_ips
assert "invalid_ip" not in result_ips
def test_preprocess_multiple_formats():
ips = [
{"address": "8.8.8.8"},
{"invalid_key": "1.2.3.4"},
Ip(address="1.1.1.1"),
"1.1.1.1",
]
result = scanner.preprocess(ips)
result_ips = [d.address for d in result]
assert "8.8.8.8" in result_ips
assert "1.1.1.1" in result_ips
assert "1.2.3.4" not in result_ips
def test_scan_returns_ip(monkeypatch):
# Mock of get_location_data
def mock_get_location_data(address):
return {
"latitude": 37.386,
"longitude": -122.0838,
"country": "US",
"city": "Mountain View",
"isp": "Google LLC"
}
monkeypatch.setattr(scanner, "get_location_data", mock_get_location_data)
input_data = [Ip(address="8.8.8.8")]
output = scanner.execute(input_data)
assert isinstance(output, list)
assert isinstance(output[0], Ip)
assert output[0].address == "8.8.8.8"
assert output[0].city == "Mountain View"
assert output[0].country == "US"
assert output[0].isp == "Google LLC"
def test_schemas():
input_schema = scanner.input_schema()
output_schema = scanner.output_schema()
assert input_schema == {'type': 'Ip', 'properties': [{'name': 'address', 'type': 'string'}, {'name': 'latitude', 'type': 'number | null'}, {'name': 'longitude', 'type': 'number | null'}, {'name': 'country', 'type': 'string | null'}, {'name': 'city', 'type': 'string | null'}, {'name': 'isp', 'type': 'string | null'}]}
assert output_schema == {'type': 'Ip', 'properties': [{'name': 'address', 'type': 'string'}, {'name': 'latitude', 'type': 'number | null'}, {'name': 'longitude', 'type': 'number | null'}, {'name': 'country', 'type': 'string | null'}, {'name': 'city', 'type': 'string | null'}, {'name': 'isp', 'type': 'string | null'}]}

View File

@@ -0,0 +1,260 @@
import json
from unittest.mock import Mock
from flowsint_transforms.ips.ip_to_asn import IpToAsnScanner
from flowsint_types.ip import Ip
from flowsint_types.asn import ASN
from flowsint_types.cidr import CIDR
from tests.logger import TestLogger
logger = TestLogger()
# The scanner will get a mock logger from conftest.py automatically
scanner = IpToAsnScanner("sketch_123", "scan_123", logger)
def test_preprocess_valid_ips():
ips = [
Ip(address="8.8.8.8"),
Ip(address="1.1.1.1"),
]
result = scanner.preprocess(ips)
result_addresses = [ip.address for ip in result]
expected_addresses = [ip.address for ip in ips]
assert result_addresses == expected_addresses
def test_unprocessed_valid_ips():
ips = [
"8.8.8.8",
"1.1.1.1",
]
result = scanner.preprocess(ips)
result_ips = [ip for ip in result]
expected_ips = [Ip(address=ip) for ip in ips]
assert result_ips == expected_ips
def test_preprocess_invalid_ips():
ips = [
Ip(address="8.8.8.8"),
Ip(address="invalid_ip"),
Ip(address="192.168.1.1"),
]
result = scanner.preprocess(ips)
result_addresses = [ip.address for ip in result]
assert "8.8.8.8" in result_addresses
assert "192.168.1.1" in result_addresses
assert "invalid_ip" not in result_addresses
def test_preprocess_multiple_formats():
ips = [
{"address": "8.8.8.8"},
{"invalid_key": "1.1.1.1"},
Ip(address="192.168.1.1"),
"10.0.0.1",
]
result = scanner.preprocess(ips)
result_addresses = [ip.address for ip in result]
assert "8.8.8.8" in result_addresses
assert "192.168.1.1" in result_addresses
assert "10.0.0.1" in result_addresses
assert "1.1.1.1" not in result_addresses # Should be filtered out due to invalid key
def test_scan_extracts_asn_info(monkeypatch):
mock_asnmap_output = {
"input": "8.8.8.8",
"as_number": "AS15169",
"as_name": "GOOGLE",
"as_country": "US",
"as_range": ["8.8.8.0/24", "8.8.4.0/24"]
}
class MockSubprocessResult:
def __init__(self, stdout):
self.stdout = stdout
self.returncode = 0
def mock_subprocess_run(cmd, input, capture_output, text, timeout):
assert "asnmap" in cmd
assert input == "8.8.8.8"
return MockSubprocessResult(json.dumps(mock_asnmap_output))
# Patch the subprocess call in the scanner
monkeypatch.setattr("subprocess.run", mock_subprocess_run)
input_data = [Ip(address="8.8.8.8")]
asns = scanner.scan(input_data)
assert isinstance(asns, list)
assert len(asns) == 1
asn = asns[0]
assert isinstance(asn, ASN)
assert asn.number == 15169 # AS15169 -> 15169
assert asn.name == "GOOGLE"
assert asn.country == "US"
assert len(asn.cidrs) == 2
assert str(asn.cidrs[0].network) == "8.8.8.0/24"
assert str(asn.cidrs[1].network) == "8.8.4.0/24"
def test_scan_handles_no_asn_found(monkeypatch):
class MockSubprocessResult:
def __init__(self, stdout):
self.stdout = stdout
self.returncode = 0
def mock_subprocess_run(cmd, input, capture_output, text, timeout):
# Return empty output to simulate no ASN found
return MockSubprocessResult("")
monkeypatch.setattr("subprocess.run", mock_subprocess_run)
input_data = [Ip(address="192.168.1.1")]
asns = scanner.scan(input_data)
assert isinstance(asns, list)
assert len(asns) == 1
asn = asns[0]
assert isinstance(asn, ASN)
assert asn.number == 0
assert asn.name == "Unknown"
assert asn.country == "Unknown"
assert len(asn.cidrs) == 0
def test_scan_handles_subprocess_exception(monkeypatch):
def mock_subprocess_run(cmd, input, capture_output, text, timeout):
raise Exception("Subprocess failed")
monkeypatch.setattr("subprocess.run", mock_subprocess_run)
input_data = [Ip(address="8.8.8.8")]
asns = scanner.scan(input_data)
assert isinstance(asns, list)
assert len(asns) == 1
asn = asns[0]
assert isinstance(asn, ASN)
assert asn.number == 0
assert asn.name == "Unknown"
assert asn.country == "Unknown"
def test_scan_multiple_ips(monkeypatch):
mock_responses = {
"8.8.8.8": {
"input": "8.8.8.8",
"as_number": "AS15169",
"as_name": "GOOGLE",
"as_country": "US",
"as_range": ["8.8.8.0/24"]
},
"1.1.1.1": {
"input": "1.1.1.1",
"as_number": "AS13335",
"as_name": "CLOUDFLARE",
"as_country": "US",
"as_range": ["1.1.1.0/24"]
}
}
class MockSubprocessResult:
def __init__(self, stdout):
self.stdout = stdout
self.returncode = 0
def mock_subprocess_run(cmd, input, capture_output, text, timeout):
if input in mock_responses:
return MockSubprocessResult(json.dumps(mock_responses[input]))
return MockSubprocessResult("")
monkeypatch.setattr("subprocess.run", mock_subprocess_run)
input_data = [Ip(address="8.8.8.8"), Ip(address="1.1.1.1")]
asns = scanner.scan(input_data)
assert len(asns) == 2
# Check first ASN
assert asns[0].number == 15169
assert asns[0].name == "GOOGLE"
# Check second ASN
assert asns[1].number == 13335
assert asns[1].name == "CLOUDFLARE"
def test_schemas():
input_schema = scanner.input_schema()
output_schema = scanner.output_schema()
# Input schema should have address field
assert "properties" in input_schema
address_prop = next((prop for prop in input_schema["properties"] if prop["name"] == "address"), None)
assert address_prop is not None
assert address_prop["type"] == "string"
# Output schema should have ASN fields
assert "properties" in output_schema
prop_names = [prop["name"] for prop in output_schema["properties"]]
assert "number" in prop_names
assert "name" in prop_names
assert "country" in prop_names
assert "cidrs" in prop_names
def test_postprocess_creates_neo4j_relationships(monkeypatch):
# Mock Neo4j connection
mock_neo4j = Mock()
scanner.neo4j_conn = mock_neo4j
input_data = [Ip(address="8.8.8.8")]
asn_results = [ASN(
number=15169,
name="GOOGLE",
country="US",
cidrs=[CIDR(network="8.8.8.0/24")]
)]
result = scanner.postprocess(asn_results, input_data)
# Verify Neo4j query was called
mock_neo4j.query.assert_called_once()
# Check the query parameters
call_args = mock_neo4j.query.call_args
params = call_args[0][1]
assert params["ip_address"] == "8.8.8.8"
assert params["asn_number"] == 15169
assert params["asn_name"] == "GOOGLE"
assert params["asn_country"] == "US"
assert params["sketch_id"] == "sketch_123"
# Should return the same results
assert result == asn_results
def test_postprocess_skips_unknown_asns(monkeypatch):
# Mock Neo4j connection
mock_neo4j = Mock()
scanner.neo4j_conn = mock_neo4j
input_data = [Ip(address="192.168.1.1")]
asn_results = [ASN(
number=0, # Unknown ASN
name="Unknown",
country="Unknown",
cidrs=[]
)]
result = scanner.postprocess(asn_results, input_data)
# Verify Neo4j query was NOT called for unknown ASN
mock_neo4j.query.assert_not_called()
# Should return the same results
assert result == asn_results

View File

@@ -0,0 +1,30 @@
from flowsint_transforms.organizations.to_infos import OrgToInfosScanner
from flowsint_types.organization import Organization
scanner = OrgToInfosScanner("sketch_123", "scan_123")
def test_preprocess_valid_names():
data = [
Organization(name="OpenAI"),
{"name": "Inria"},
"OVH"
]
result = scanner.preprocess(data)
result_names = [org.name for org in result]
assert result_names == ["OpenAI", "Inria", "OVH"]
# def test_preprocess_invalid_entries():
# data = [
# {"wrong_key": "value"},
# 123,
# None,
# "",
# {"name": ""},
# ]
# result = scanner.preprocess(data)
# assert result == []
def test_execute():
scanner.execute(["Karim Terrache"])
assert True

View File

@@ -0,0 +1,53 @@
from pathlib import Path
from flowsint_transforms.socials.maigret import MaigretScanner
from flowsint_types.social import Social
scanner = MaigretScanner("sketch_123", "scan_123")
def test_unprocessed_valid_usernames():
usernames = [
"toto123",
"DorianXd78",
]
result = scanner.preprocess(usernames)
result_usernames = [d for d in result]
expected_usernames = [Social(username=d) for d in usernames]
assert result_usernames == expected_usernames
def test_preprocess_invalid_usernames():
usernames = [
Social(username="toto123"),
Social(username="DorianXd78_Official"),
Social(username="This is not a username"),
]
result = scanner.preprocess(usernames)
result_usernames = [d.username for d in result]
assert "toto123" in result_usernames
assert "DorianXd78_Official" in result_usernames
assert "This is not a username" not in result_usernames
def test_preprocess_multiple_formats():
usernames = [
{"username": "toto123"},
{"invalid_key": "ValId_UseRnAme"},
Social(username="DorianXd78_Official"),
"MySimpleUsername",
]
result = scanner.preprocess(usernames)
result_usernames = [d.username for d in result]
assert "toto123" in result_usernames
assert "DorianXd78_Official" in result_usernames
assert "ValId_UseRnAme" not in result_usernames
assert "MySimpleUsername" in result_usernames
def test_parsing_invalid_output_file():
results = scanner.parse_maigret_output("toto123", Path("/this/path/does/not/exist"))
assert results == []
def test_parsing():
results = scanner.parse_maigret_output("toto123", Path("/tmp/maigret_test.json"))
print(results)
assert len(results) == 2

View File

@@ -0,0 +1,156 @@
import pytest
from flowsint_transforms.registry import ScannerRegistry
from flowsint_core.core.scanner_base import Scanner
class TestScannerRegistry:
"""Test suite for ScannerRegistry functionality"""
def test_registry_is_populated(self):
"""Test that the registry is populated with scanners"""
scanners = ScannerRegistry.list()
assert len(scanners) > 0
assert isinstance(scanners, dict)
def test_list_returns_proper_structure(self):
"""Test that list() returns the expected structure"""
scanners = ScannerRegistry.list()
# Check that each scanner has the expected keys
for name, scanner_info in scanners.items():
assert "class_name" in scanner_info
assert "name" in scanner_info
assert "module" in scanner_info
assert "doc" in scanner_info
assert "category" in scanner_info
assert "inputs" in scanner_info
assert "outputs" in scanner_info
assert "params" in scanner_info
assert "params_schema" in scanner_info
assert "required_params" in scanner_info
# Check that name matches the key
assert scanner_info["name"] == name
def test_list_by_categories_structure(self):
"""Test that list_by_categories() returns the expected structure"""
scanners_by_category = ScannerRegistry.list_by_categories()
assert isinstance(scanners_by_category, dict)
# Check that each category contains a list of scanners
for category, scanners in scanners_by_category.items():
assert isinstance(scanners, list)
assert len(scanners) > 0
for scanner_info in scanners:
assert "class_name" in scanner_info
assert "name" in scanner_info
assert "category" in scanner_info
assert scanner_info["category"] == category
def test_list_by_input_type_filtering(self):
"""Test that list_by_input_type() properly filters scanners"""
# Test with a known input type
domain_scanners = ScannerRegistry.list_by_input_type("Domain")
assert isinstance(domain_scanners, list)
for scanner_info in domain_scanners:
input_type = scanner_info["inputs"]["type"]
assert input_type in ["Any", "Domain"]
def test_scanner_exists_method(self):
"""Test the scanner_exists method"""
# Get a real scanner name from the registry
scanners = ScannerRegistry.list()
if scanners:
real_scanner_name = list(scanners.keys())[0]
assert ScannerRegistry.scanner_exists(real_scanner_name) is True
# Test with non-existent scanner
assert ScannerRegistry.scanner_exists("non_existent_scanner") is False
def test_get_scanner_valid(self):
"""Test getting a valid scanner instance"""
scanners = ScannerRegistry.list()
if scanners:
scanner_name = list(scanners.keys())[0]
scanner_instance = ScannerRegistry.get_scanner(
scanner_name,
sketch_id="test_sketch",
scan_id="test_scan"
)
assert isinstance(scanner_instance, Scanner)
assert scanner_instance.sketch_id == "test_sketch"
assert scanner_instance.scan_id == "test_scan"
def test_get_scanner_invalid(self):
"""Test getting an invalid scanner raises exception"""
with pytest.raises(Exception, match="Scanner 'invalid_scanner' not found"):
ScannerRegistry.get_scanner(
"invalid_scanner",
sketch_id="test_sketch",
scan_id="test_scan"
)
def test_specific_scanners_are_registered(self):
"""Test that specific expected scanners are registered"""
scanners = ScannerRegistry.list()
# Check for some key scanners that should be registered
expected_scanners = [
"domain_resolve_scanner",
"domain_subdomains_scanner",
"to_whois",
"ip_geolocation_scanner",
"maigret_scanner"
]
for expected_scanner in expected_scanners:
assert expected_scanner in scanners, f"Scanner '{expected_scanner}' not found in registry"
def test_crypto_scanners_are_registered(self):
"""Test that crypto scanners are registered"""
scanners = ScannerRegistry.list()
crypto_scanners = [
"wallet_to_transactions",
"wallet_to_nfts"
]
for crypto_scanner in crypto_scanners:
assert crypto_scanner in scanners, f"Crypto scanner '{crypto_scanner}' not found in registry"
def test_scanner_categories_are_valid(self):
"""Test that all scanners have valid categories"""
scanners = ScannerRegistry.list()
for name, scanner_info in scanners.items():
category = scanner_info["category"]
assert isinstance(category, str), f"Scanner '{name}' has invalid category type: {type(category)}"
# Note: We don't enforce that category must be in valid_categories
# since new categories might be added
def test_scanner_input_output_schemas_exist(self):
"""Test that all scanners have input and output schemas"""
scanners = ScannerRegistry.list()
for _, scanner_info in scanners.items():
# Check input schema
input_schema = scanner_info["inputs"]
assert isinstance(input_schema, dict)
assert "type" in input_schema
assert "properties" in input_schema
# Check output schema
output_schema = scanner_info["outputs"]
assert isinstance(output_schema, dict)
assert "type" in output_schema
assert "properties" in output_schema
def test_scanner_required_params_is_boolean(self):
"""Test that required_params returns a boolean for all scanners"""
scanners = ScannerRegistry.list()
for name, scanner_info in scanners.items():
required_params = scanner_info["required_params"]
assert isinstance(required_params, bool), f"Scanner '{name}' required_params is not boolean: {type(required_params)}"

View File

@@ -0,0 +1,162 @@
import pytest
from unittest.mock import Mock, patch, call
from flowsint_transforms.websites.to_links import WebsiteToLinks
from flowsint_types.website import Website
class MockCrawlResults:
def __init__(self, internal=None, external=None):
self.internal = internal or []
self.external = external or []
class MockCrawler:
def __init__(self, url, recursive=True, verbose=False, _on_result_callback=None):
self.url = url
self.callback = _on_result_callback
def fetch(self):
pass
def extract_urls(self):
# Simulate callback calls
if self.callback:
self.callback("https://example.com/page1", is_external=False)
self.callback("https://example.com/page2", is_external=False)
self.callback("https://external.com/page", is_external=True)
self.callback("https://another-external.org/resource", is_external=True)
def get_results(self):
return MockCrawlResults(
internal=["https://example.com/page1", "https://example.com/page2"],
external=["https://external.com/page", "https://another-external.org/resource"]
)
@pytest.mark.asyncio
async def test_website_to_links_real_time_neo4j_creation():
"""Test that Neo4j nodes are created in real-time during the callback."""
scanner = WebsiteToLinks(sketch_id="test", scan_id="test")
# Mock neo4j connection and methods
scanner.neo4j_conn = Mock()
scanner.create_node = Mock()
scanner.create_relationship = Mock()
scanner.log_graph_message = Mock()
# Test input
websites = [Website(url="https://example.com")]
with patch('src.transforms.websites.to_links.Crawler', MockCrawler):
results = await scanner.scan(websites)
# Verify main website and domain nodes were created upfront
scanner.create_node.assert_any_call('website', 'url', 'https://example.com',
caption='https://example.com', type='website')
scanner.create_node.assert_any_call('domain', 'name', 'example.com',
caption='example.com', type='domain')
# Verify main website to domain relationship
scanner.create_relationship.assert_any_call('website', 'url', 'https://example.com',
'domain', 'name', 'example.com', 'BELONGS_TO_DOMAIN')
# Verify internal website nodes were created in callback
scanner.create_node.assert_any_call('website', 'url', 'https://example.com/page1',
caption='https://example.com/page1', type='website')
scanner.create_node.assert_any_call('website', 'url', 'https://example.com/page2',
caption='https://example.com/page2', type='website')
# Verify internal website relationships
scanner.create_relationship.assert_any_call('website', 'url', 'https://example.com',
'website', 'url', 'https://example.com/page1', 'LINKS_TO')
scanner.create_relationship.assert_any_call('website', 'url', 'https://example.com',
'website', 'url', 'https://example.com/page2', 'LINKS_TO')
# Verify external website nodes were created in callback
scanner.create_node.assert_any_call('website', 'url', 'https://external.com/page',
caption='https://external.com/page', type='website')
scanner.create_node.assert_any_call('website', 'url', 'https://another-external.org/resource',
caption='https://another-external.org/resource', type='website')
# Verify external domain nodes were created in callback
scanner.create_node.assert_any_call('domain', 'name', 'external.com',
caption='external.com', type='domain')
scanner.create_node.assert_any_call('domain', 'name', 'another-external.org',
caption='another-external.org', type='domain')
# Verify main website to external website relationships
scanner.create_relationship.assert_any_call('website', 'url', 'https://example.com',
'website', 'url', 'https://external.com/page', 'LINKS_TO')
scanner.create_relationship.assert_any_call('website', 'url', 'https://example.com',
'website', 'url', 'https://another-external.org/resource', 'LINKS_TO')
# Verify external website to domain relationships
scanner.create_relationship.assert_any_call('website', 'url', 'https://external.com/page',
'domain', 'name', 'external.com', 'BELONGS_TO_DOMAIN')
scanner.create_relationship.assert_any_call('website', 'url', 'https://another-external.org/resource',
'domain', 'name', 'another-external.org', 'BELONGS_TO_DOMAIN')
# Verify main website to external domain relationships
scanner.create_relationship.assert_any_call('website', 'url', 'https://example.com',
'domain', 'name', 'external.com', 'LINKS_TO_DOMAIN')
scanner.create_relationship.assert_any_call('website', 'url', 'https://example.com',
'domain', 'name', 'another-external.org', 'LINKS_TO_DOMAIN')
@pytest.mark.asyncio
async def test_website_to_links_error_handling_with_neo4j():
"""Test that main nodes are still created even when crawling fails."""
scanner = WebsiteToLinks(sketch_id="test", scan_id="test")
# Mock neo4j connection and methods
scanner.neo4j_conn = Mock()
scanner.create_node = Mock()
scanner.create_relationship = Mock()
scanner.log_graph_message = Mock()
# Mock crawler that raises an exception
def mock_crawler_error(*args, **kwargs):
raise Exception("Test error")
websites = [Website(url="https://example.com")]
with patch('src.transforms.websites.to_links.Crawler', mock_crawler_error):
results = await scanner.scan(websites)
# Verify main website and domain nodes were still created despite error
scanner.create_node.assert_any_call('website', 'url', 'https://example.com',
caption='https://example.com', type='website')
scanner.create_node.assert_any_call('domain', 'name', 'example.com',
caption='example.com', type='domain')
# Verify main website to domain relationship was created
scanner.create_relationship.assert_any_call('website', 'url', 'https://example.com',
'domain', 'name', 'example.com', 'BELONGS_TO_DOMAIN')
# Verify result structure
assert len(results) == 1
result = results[0]
assert result["website"] == "https://example.com"
assert result["main_domain"] == "example.com"
assert result["internal_urls"] == []
assert result["external_urls"] == []
assert result["external_domains"] == []
def test_postprocess_simplified():
"""Test that postprocess now just returns results as-is."""
scanner = WebsiteToLinks(sketch_id="test", scan_id="test")
original_input = [Website(url="https://example.com")]
results = [{
"website": "https://example.com",
"main_domain": "example.com",
"internal_urls": ["https://example.com/page1"],
"external_urls": ["https://external.com/page"],
"external_domains": ["external.com"]
}]
processed_results = scanner.postprocess(results, original_input)
# Should just return the same results since Neo4j work is done in real-time
assert processed_results == results

View File

@@ -0,0 +1,42 @@
import re
from typing import Dict
from app.tools.network.asnmap import AsnmapTool
tool = AsnmapTool()
def test_name():
assert tool.name() == "asnmap"
def test_description():
assert tool.description() == "ASN mapping and network reconnaissance tool."
def test_category():
assert tool.category() == "ASN discovery"
def test_image():
assert tool.get_image() == "projectdiscovery/asnmap"
def test_install():
tool.install()
assert tool.is_installed() == True
def test_version():
tool.install()
version = tool.version()
# Check that version follows the expected format: v followed by digits and dots
assert re.match(r'^v[\d\.]+$', version)
def test_launch_no_api_key():
import pytest
with pytest.raises(KeyError, match="Missing key"):
tool.launch("alliage.io", 'domain')
def test_launch_wrong_type():
import pytest
with pytest.raises(ValueError, match="Invalid type: 'domains'"):
tool.launch("alliage.io", 'domains')
def test_launch():
results = tool.launch("alliage.io", 'domain')
assert isinstance(results, Dict)

View File

@@ -0,0 +1,40 @@
import re
from typing import List
from app.tools.network.httpx import HttpxTool
tool = HttpxTool()
def test_name():
assert tool.name() == "httpx"
def test_description():
assert tool.description() == "An HTTP toolkit that probes services, web servers, and other valuable metadata."
def test_category():
assert tool.category() == "Web technologies enumeration"
def test_image():
assert tool.get_image() == "projectdiscovery/httpx"
def test_install():
tool.install()
assert tool.is_installed() == True
def test_version():
tool.install()
version = tool.version()
# Check that version follows the expected format: v followed by digits and dots
assert re.match(r'^v[\d\.]+$', version)
def test_launch():
assert True
results = tool.launch("https://alliage.io")
print(results)
assert isinstance(results, List)
def test_launch_unreached_host():
assert True
results = tool.launch("https://this-is-not-a-valid-domain.local")
assert isinstance(results, List)
assert len(results) == 0

View File

@@ -0,0 +1,20 @@
import re
from typing import Dict
from app.tools.network.reconcrawl import ReconCrawlTool
tool = ReconCrawlTool()
def test_name():
assert tool.name() == "reconcrawl"
def test_description():
assert tool.description() == "Emails and phone numbers crawler from websites by analyzing their HTML and embedded scripts."
def test_category():
assert tool.category() == "Crawler"
def test_install():
tool.install()
assert tool.is_installed() == True

View File

@@ -0,0 +1,32 @@
import re
from app.tools.network.subfinder import SubfinderTool
tool = SubfinderTool()
def test_name():
assert tool.name() == "subfinder"
def test_description():
assert tool.description() == "Fast passive subdomain enumeration tool."
def test_category():
assert tool.category() == "Subdomain enumeration"
def test_image():
assert tool.get_image() == "projectdiscovery/subfinder"
def test_install():
tool.install()
assert tool.is_installed() == True
def test_version():
tool.install()
version = tool.version()
# Check that version follows the expected format: v followed by digits and dots
assert re.match(r'^v[\d\.]+$', version)
def test_launch():
results = tool.launch("alliage.io")
assert isinstance(results, list)
assert all(isinstance(item, str) for item in results)

View File

@@ -0,0 +1,29 @@
import re
from typing import Dict
from app.tools.organizations.sirene import SireneTool
tool = SireneTool()
def test_name():
assert tool.name() == "sirene"
def test_description():
assert tool.description() == "The Sirene API allows you to query the Sirene directory of businesses and establishments, managed by Insee."
def test_category():
assert tool.category() == "Business intelligence"
def test_launch_org():
results = tool.launch("blablacar", 1)
assert isinstance(results, list)
assert all(isinstance(item, Dict) for item in results)
def test_launch_person():
results = tool.launch("Karim+Terrache", 1)
assert isinstance(results, list)
assert all(isinstance(item, Dict) for item in results)
def test_launch_person_space_format():
results = tool.launch("Karim Terrache", 1)
assert isinstance(results, list)
assert all(isinstance(item, Dict) for item in results)

View File

@@ -0,0 +1,41 @@
from flowsint_core.core.scanner_base import build_params_model
def test_build_params_model_valid():
param_schema = [
{
"name": "ETHERSCAN_API_KEY",
"type": "string",
"description": "The Etherscan API key to use for the transaction lookup.",
"required": True
},
{
"name": "url",
"type": "string",
"description": "Base URL for API",
"required": False,
"default": "https://api.etherscan.io/api"
}
]
ParamsModel = build_params_model(param_schema)
validated_params = ParamsModel(ETHERSCAN_API_KEY="clef-123")
assert validated_params.ETHERSCAN_API_KEY == "clef-123"
assert validated_params.url == "https://api.etherscan.io/api"
def test_build_params_model_invalid():
param_schema = [
{
},
{
"name": "url",
"type": "string",
"description": "Base URL for API",
"required": False,
"default": "https://api.etherscan.io/api"
}
]
ParamsModel = build_params_model(param_schema)
validated_params = ParamsModel(ETHERSCAN_API_KEY="clef-123")
assert validated_params.ETHERSCAN_API_KEY == "clef-123"
assert validated_params.url == "https://api.etherscan.io/api"