feat: python formatter finally working

This commit is contained in:
dextmorgn
2025-08-13 20:08:06 +02:00
parent 4d5f96bb8d
commit 7620a6d145
148 changed files with 6777 additions and 3269 deletions

View File

@@ -1,9 +1,12 @@
import pytest
from tests.logger import TestLogger
@pytest.fixture(autouse=True)
def mock_logger(monkeypatch):
"""Automatically replace the production Logger with TestLogger for all tests."""
monkeypatch.setattr("flowsint_core.core.logger.Logger", TestLogger)
# Mock the emit_event_task to do nothing
monkeypatch.setattr("flowsint_core.core.logger.emit_event_task.delay", lambda *args, **kwargs: None)
monkeypatch.setattr(
"flowsint_core.core.logger.emit_event_task.delay", lambda *args, **kwargs: None
)

View File

@@ -1,11 +1,10 @@
import sys
import os
import asyncio
if __name__ == "__main__":
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from flowsint_types.domain import Domain
from flowsint_types.ip import Ip
from flowsint_transforms.domains.resolve import ResolveScanner
@@ -14,15 +13,16 @@ from flowsint_transforms.domains.resolve import ResolveScanner
async def main():
# Create test data
domains = [Domain(domain="adaltas.com")]
ips = [Ip(address='12.23.34.45'), Ip(address='56.67.78.89')]
ips = [Ip(address="12.23.34.45"), Ip(address="56.67.78.89")]
# Test the scanner
scanner = ResolveScanner("sketch_123", "scan_123")
# Test the new KISS postprocess method
scanner.postprocess(ips[:1], domains) # Only use first IP to match domains length
print("Postprocess test completed successfully!")
if __name__ == "__main__":
asyncio.run(main())
asyncio.run(main())

View File

@@ -5,7 +5,7 @@ EventLevel = Literal["info", "warn", "error", "success", "debug"]
LEVEL_MAP = {
"info": "INFO",
"warn": "WARN",
"warn": "WARN",
"error": "FAILED",
"success": "SUCCESS",
"debug": "DEBUG",
@@ -17,13 +17,15 @@ class TestLogger:
def _format_message(type: str, message: str) -> str:
"""Format the log message with type prefix"""
return f"[{type.upper()}] {message}"
@staticmethod
def _create_log(sketch_id: Union[str, UUID], log_type: str, content: str) -> Any:
"""Create a dummy log object for testing"""
class DummyLog:
def __init__(self):
self.id = 'dummy_id'
self.id = "dummy_id"
return DummyLog()
@staticmethod

View File

@@ -1,17 +1,22 @@
from flowsint_transforms.crypto.wallet_to_nfts import CryptoWalletAddressToNFTs
from flowsint_types.wallet import CryptoWallet, CryptoNFT
from pydantic import HttpUrl
scanner = CryptoWalletAddressToNFTs("sketch_123", "scan_123")
def test_wallet_address_to_transactions_name():
assert scanner.name() == "wallet_to_nfts"
def test_wallet_address_to_transactions_category():
assert scanner.category() == "crypto"
def test_wallet_address_to_transactions_key():
assert scanner.key() == "address"
def test_preprocess_with_string():
input_data = ["0x742d35Cc6634C0532925a3b844Bc454e4438f44e"]
result = scanner.preprocess(input_data)
@@ -19,6 +24,7 @@ def test_preprocess_with_string():
assert isinstance(result[0], CryptoWallet)
assert result[0].address == "0x742d35Cc6634C0532925a3b844Bc454e4438f44e"
def test_preprocess_with_dict():
input_data = [{"address": "0x742d35Cc6634C0532925a3b844Bc454e4438f44e"}]
result = scanner.preprocess(input_data)
@@ -26,6 +32,7 @@ def test_preprocess_with_dict():
assert isinstance(result[0], CryptoWallet)
assert result[0].address == "0x742d35Cc6634C0532925a3b844Bc454e4438f44e"
def test_preprocess_with_wallet_object():
wallet = CryptoWallet(address="0x742d35Cc6634C0532925a3b844Bc454e4438f44e")
input_data = [wallet]
@@ -34,26 +41,29 @@ def test_preprocess_with_wallet_object():
assert isinstance(result[0], CryptoWallet)
assert result[0].address == "0x742d35Cc6634C0532925a3b844Bc454e4438f44e"
def test_scan_mocked_transactions(monkeypatch):
# Mock the _get_transactions method
def mock_get_nfts(address):
return [
CryptoNFT(
wallet=CryptoWallet(address="0x742d35Cc6634C0532925a3b844Bc454e4438f44e"),
wallet=CryptoWallet(
address="0x742d35Cc6634C0532925a3b844Bc454e4438f44e"
),
contract_address="0x123",
token_id="1",
collection_name="Test Collection",
metadata_url="https://example.com/metadata.json",
image_url="https://example.com/image.png",
name="Test NFT"
name="Test NFT",
)
]
monkeypatch.setattr(scanner, "_get_nfts", mock_get_nfts)
input_data = [CryptoWallet(address="0x742d35Cc6634C0532925a3b844Bc454e4438f44e")]
result = scanner.scan(input_data)
assert len(result) == 1
assert len(result[0]) == 1
assert result[0][0].contract_address == "0x123"
@@ -61,4 +71,3 @@ def test_scan_mocked_transactions(monkeypatch):
assert result[0][0].metadata_url == HttpUrl("https://example.com/metadata.json")
assert result[0][0].image_url == HttpUrl("https://example.com/image.png")
assert result[0][0].name == "Test NFT"

View File

@@ -1,18 +1,28 @@
import pytest
from flowsint_transforms.crypto.wallet_to_transactions import CryptoWalletAddressToTransactions
from flowsint_transforms.crypto.wallet_to_transactions import (
CryptoWalletAddressToTransactions,
)
from flowsint_types.wallet import CryptoWallet, CryptoWalletTransaction
scanner = CryptoWalletAddressToTransactions("sketch_123", "scan_123", params={"ETHERSCAN_API_KEY": "ta-clef-api"},)
scanner = CryptoWalletAddressToTransactions(
"sketch_123",
"scan_123",
params={"ETHERSCAN_API_KEY": "ta-clef-api"},
)
def test_wallet_address_to_transactions_name():
assert scanner.name() == "wallet_to_transactions"
def test_wallet_address_to_transactions_category():
assert scanner.category() == "CryptoCryptoWallet"
def test_wallet_address_to_transactions_key():
assert scanner.key() == "address"
def test_preprocess_with_string():
input_data = ["0x742d35Cc6634C0532925a3b844Bc454e4438f44e"]
result = scanner.preprocess(input_data)
@@ -20,6 +30,7 @@ def test_preprocess_with_string():
assert isinstance(result[0], CryptoWallet)
assert result[0].address == "0x742d35Cc6634C0532925a3b844Bc454e4438f44e"
def test_preprocess_with_dict():
input_data = [{"address": "0x742d35Cc6634C0532925a3b844Bc454e4438f44e"}]
result = scanner.preprocess(input_data)
@@ -27,6 +38,7 @@ def test_preprocess_with_dict():
assert isinstance(result[0], CryptoWallet)
assert result[0].address == "0x742d35Cc6634C0532925a3b844Bc454e4438f44e"
def test_preprocess_with_wallet_object():
wallet = CryptoWallet(address="0x742d35Cc6634C0532925a3b844Bc454e4438f44e")
input_data = [wallet]
@@ -35,6 +47,7 @@ def test_preprocess_with_wallet_object():
assert isinstance(result[0], CryptoWallet)
assert result[0].address == "0x742d35Cc6634C0532925a3b844Bc454e4438f44e"
@pytest.mark.asyncio
async def test_scan_mocked_transactions(monkeypatch):
# Mock the _get_transactions method - note it takes address and api_key parameters
@@ -42,7 +55,9 @@ async def test_scan_mocked_transactions(monkeypatch):
return [
CryptoWalletTransaction(
hash="0x123",
source=CryptoWallet(address="0x742d35Cc6634C0532925a3b844Bc454e4438f44e"),
source=CryptoWallet(
address="0x742d35Cc6634C0532925a3b844Bc454e4438f44e"
),
target=CryptoWallet(address="0x456"),
value=1.0, # 1 ETH
timestamp="1234567890",
@@ -55,15 +70,15 @@ async def test_scan_mocked_transactions(monkeypatch):
gas_used="21000",
cumulative_gas_used="21000",
input="0x",
contract_address=None
contract_address=None,
)
]
monkeypatch.setattr(scanner, "_get_transactions", mock_get_transactions)
input_data = [CryptoWallet(address="0x742d35Cc6634C0532925a3b844Bc454e4438f44e")]
result = await scanner.scan(input_data)
assert len(result) == 1
assert len(result[0]) == 1
assert result[0][0].hash == "0x123"
@@ -72,13 +87,20 @@ async def test_scan_mocked_transactions(monkeypatch):
assert result[0][0].value == 1.0
assert result[0][0].timestamp == "1234567890"
def test_scanner_requires_api_key():
"""Test that the scanner validates required ETHERSCAN_API_KEY parameter at construction"""
with pytest.raises(ValueError, match="Scanner wallet_to_transactions received invalid params"):
with pytest.raises(
ValueError, match="Scanner wallet_to_transactions received invalid params"
):
CryptoWalletAddressToTransactions("sketch_123", "scan_123", params={})
def test_scanner_with_invalid_api_key_type():
"""Test that the scanner validates parameter types"""
with pytest.raises(ValueError, match="Scanner wallet_to_transactions received invalid params"):
CryptoWalletAddressToTransactions("sketch_123", "scan_123", params={"ETHERSCAN_API_KEY": 123})
with pytest.raises(
ValueError, match="Scanner wallet_to_transactions received invalid params"
):
CryptoWalletAddressToTransactions(
"sketch_123", "scan_123", params={"ETHERSCAN_API_KEY": 123}
)

View File

@@ -6,18 +6,20 @@ import pytest
scanner = ResolveScanner("sketch_123", "scan_123")
def test_preprocess_valid_domains():
domains = [
Domain(domain="example.com"),
Domain(domain="example2.com"),
]
result = scanner.preprocess(domains)
result_domains = [d.domain for d in result]
expected_domains = [d.domain for d in domains]
assert result_domains == expected_domains
def test_unprocessed_valid_domains():
domains = [
"example.com",
@@ -26,8 +28,9 @@ def test_unprocessed_valid_domains():
result = scanner.preprocess(domains)
result_domains = [d for d in result]
expected_domains = [Domain(domain=d) for d in domains]
assert result_domains == expected_domains
assert result_domains == expected_domains
def test_preprocess_invalid_domains():
domains = [
Domain(domain="example.com"),
@@ -41,6 +44,7 @@ def test_preprocess_invalid_domains():
assert "example.org" in result_domains
assert "invalid_domain" not in result_domains
def test_preprocess_multiple_formats():
domains = [
{"domain": "example.com"},
@@ -56,6 +60,7 @@ def test_preprocess_multiple_formats():
assert "invalid_domain" not in result_domains
assert "example.io" not in result_domains
@pytest.mark.asyncio
async def test_scan_returns_ip(monkeypatch):
# on crée une fonction mock qui retourne une IP
@@ -70,78 +75,79 @@ async def test_scan_returns_ip(monkeypatch):
assert isinstance(output, list)
assert output[0].address == "12.23.34.45"
def test_schemas():
input_schema = scanner.input_schema()
output_schema = scanner.output_schema()
# Test the structure and key properties rather than exact match
assert input_schema['type'] == 'Domain'
assert isinstance(input_schema['properties'], list)
input_property_names = [prop['name'] for prop in input_schema['properties']]
assert 'domain' in input_property_names
assert output_schema['type'] == 'Ip'
assert isinstance(output_schema['properties'], list)
output_property_names = [prop['name'] for prop in output_schema['properties']]
assert 'address' in output_property_names
assert input_schema["type"] == "Domain"
assert isinstance(input_schema["properties"], list)
input_property_names = [prop["name"] for prop in input_schema["properties"]]
assert "domain" in input_property_names
assert output_schema["type"] == "Ip"
assert isinstance(output_schema["properties"], list)
output_property_names = [prop["name"] for prop in output_schema["properties"]]
assert "address" in output_property_names
class TestResolveInputOutputTypes:
"""Test the InputType/OutputType functionality for ResolveScanner"""
def test_input_output_types_are_defined(self):
"""Test that InputType and OutputType are properly defined"""
assert hasattr(ResolveScanner, 'InputType')
assert hasattr(ResolveScanner, 'OutputType')
assert hasattr(ResolveScanner, "InputType")
assert hasattr(ResolveScanner, "OutputType")
assert ResolveScanner.InputType == List[Domain]
assert ResolveScanner.OutputType == List[Ip]
def test_schemas_use_generate_methods(self):
"""Test that schema methods use the new generate methods"""
# These should work without error
input_schema = ResolveScanner.generate_input_schema()
output_schema = ResolveScanner.generate_output_schema()
assert isinstance(input_schema, dict)
assert isinstance(output_schema, dict)
assert input_schema["type"] == "Domain"
assert output_schema["type"] == "Ip"
def test_schema_methods_return_same_as_generate_methods(self):
"""Test that input_schema() and output_schema() return the same as generate methods"""
assert ResolveScanner.input_schema() == ResolveScanner.generate_input_schema()
assert ResolveScanner.output_schema() == ResolveScanner.generate_output_schema()
def test_input_schema_properties(self):
"""Test input schema has expected properties"""
schema = ResolveScanner.input_schema()
properties = schema["properties"]
property_names = [p["name"] for p in properties]
# Domain should have these properties
assert "domain" in property_names
def test_output_schema_properties(self):
"""Test output schema has expected properties"""
schema = ResolveScanner.output_schema()
properties = schema["properties"]
property_names = [p["name"] for p in properties]
# Ip should have these properties
assert "address" in property_names
def test_type_accessibility_from_instance(self):
"""Test that types are accessible from scanner instance"""
scanner_instance = ResolveScanner("test", "test")
assert scanner_instance.InputType == List[Domain]
assert scanner_instance.OutputType == List[Ip]
# Should be able to generate schemas from instance
input_schema = scanner_instance.generate_input_schema()
output_schema = scanner_instance.generate_output_schema()
assert input_schema["type"] == "Domain"
assert output_schema["type"] == "Ip"

View File

@@ -3,18 +3,20 @@ from flowsint_types.domain import Domain, Domain
scanner = SubdomainScanner("sketch_123", "scan_123")
def test_preprocess_valid_domains():
domains = [
Domain(domain="example.com"),
Domain(domain="example2.com"),
]
result = scanner.preprocess(domains)
result_domains = [d.domain for d in result]
expected_domains = [d.domain for d in domains]
assert result_domains == expected_domains
def test_unprocessed_valid_domains():
domains = [
"example.com",
@@ -23,8 +25,9 @@ def test_unprocessed_valid_domains():
result = scanner.preprocess(domains)
result_domains = [d for d in result]
expected_domains = [Domain(domain=d) for d in domains]
assert result_domains == expected_domains
assert result_domains == expected_domains
def test_preprocess_invalid_domains():
domains = [
Domain(domain="example.com"),
@@ -38,6 +41,7 @@ def test_preprocess_invalid_domains():
assert "example.org" in result_domains
assert "invalid_domain" not in result_domains
def test_preprocess_multiple_formats():
domains = [
{"domain": "example.com"},
@@ -83,13 +87,9 @@ def test_scan_extracts_subdomains(monkeypatch):
input_data = [Domain(domain="example.com")]
domains = scanner.execute(input_data)
assert isinstance(domains, list)
for sub in domains:
for sub in domains:
print(sub)
assert isinstance(sub, Domain)
expected = sorted([
"mail.example.com",
"www.example.com",
"api.example.com"
])
expected = sorted(["mail.example.com", "www.example.com", "api.example.com"])
print(domains)
# assert domains[0].subdomains == expected

View File

@@ -3,18 +3,20 @@ from flowsint_types.domain import Domain
scanner = WhoisScanner("sketch_123", "scan_123")
def test_preprocess_valid_domains():
domains = [
Domain(domain="example.com"),
Domain(domain="example2.com"),
]
result = scanner.preprocess(domains)
result_domains = [d.domain for d in result]
expected_domains = [d.domain for d in domains]
assert result_domains == expected_domains
def test_unprocessed_valid_domains():
domains = [
"example.com",
@@ -23,8 +25,9 @@ def test_unprocessed_valid_domains():
result = scanner.preprocess(domains)
result_domains = [d for d in result]
expected_domains = [Domain(domain=d) for d in domains]
assert result_domains == expected_domains
assert result_domains == expected_domains
def test_preprocess_invalid_domains():
domains = [
Domain(domain="example.com"),
@@ -38,6 +41,7 @@ def test_preprocess_invalid_domains():
assert "example.org" in result_domains
assert "invalid_domain" not in result_domains
def test_preprocess_multiple_formats():
domains = [
{"domain": "example.com"},
@@ -53,6 +57,7 @@ def test_preprocess_multiple_formats():
assert "invalid_domain" not in result_domains
assert "example.io" not in result_domains
def test_scan_returns_whois_objects(monkeypatch):
# Patch `whois.whois` to avoid real network call
mock_whois = lambda domain: {
@@ -62,7 +67,7 @@ def test_scan_returns_whois_objects(monkeypatch):
"country": "MockCountry",
"emails": ["admin@example.com"],
"creation_date": "2020-01-01",
"expiration_date": "2030-01-01"
"expiration_date": "2030-01-01",
}
monkeypatch.setattr("whois.whois", mock_whois)
@@ -74,8 +79,25 @@ def test_scan_returns_whois_objects(monkeypatch):
assert output[0].whois.org == "MockOrg"
assert output[0].whois.email.email == "admin@example.com"
def test_schemas():
input_schema = scanner.input_schema()
output_schema = scanner.output_schema()
assert input_schema == {'type': 'Domain', 'properties': [{'name': 'domain', 'type': 'string'}, {'name': 'subdomains', 'type': 'array | null'}, {'name': 'ips', 'type': 'array | null'}, {'name': 'whois', 'type': 'Whois | null'}]}
assert output_schema == {'type': 'Domain', 'properties': [{'name': 'domain', 'type': 'string'}, {'name': 'subdomains', 'type': 'array | null'}, {'name': 'ips', 'type': 'array | null'}, {'name': 'whois', 'type': 'Whois | null'}]}
assert input_schema == {
"type": "Domain",
"properties": [
{"name": "domain", "type": "string"},
{"name": "subdomains", "type": "array | null"},
{"name": "ips", "type": "array | null"},
{"name": "whois", "type": "Whois | null"},
],
}
assert output_schema == {
"type": "Domain",
"properties": [
{"name": "domain", "type": "string"},
{"name": "subdomains", "type": "array | null"},
{"name": "ips", "type": "array | null"},
{"name": "whois", "type": "Whois | null"},
],
}

View File

@@ -28,7 +28,9 @@ class TestEmailToGravatarScanner:
assert schema["type"] == "Email"
assert "properties" in schema
# Check that email property is present
email_prop = next((prop for prop in schema["properties"] if prop["name"] == "email"), None)
email_prop = next(
(prop for prop in schema["properties"] if prop["name"] == "email"), None
)
assert email_prop is not None
assert email_prop["type"] == "string"
@@ -38,8 +40,12 @@ class TestEmailToGravatarScanner:
assert schema["type"] == "Gravatar"
assert "properties" in schema
# Check that required properties are present
src_prop = next((prop for prop in schema["properties"] if prop["name"] == "src"), None)
hash_prop = next((prop for prop in schema["properties"] if prop["name"] == "hash"), None)
src_prop = next(
(prop for prop in schema["properties"] if prop["name"] == "src"), None
)
hash_prop = next(
(prop for prop in schema["properties"] if prop["name"] == "hash"), None
)
assert src_prop is not None
assert hash_prop is not None
@@ -114,7 +120,7 @@ class TestEmailToGravatarScanner:
result = scanner.preprocess([])
assert result == []
@patch('requests.get')
@patch("requests.get")
def test_scan_successful_gravatar(self, mock_get):
"""Test successful gravatar retrieval"""
# Mock successful response
@@ -130,7 +136,7 @@ class TestEmailToGravatarScanner:
assert result[0].hash == hashlib.md5("test@example.com".encode()).hexdigest()
assert "gravatar.com/avatar/" in str(result[0].src)
@patch('requests.get')
@patch("requests.get")
def test_scan_failed_request(self, mock_get):
"""Test handling of failed HTTP requests"""
# Mock failed response
@@ -143,7 +149,7 @@ class TestEmailToGravatarScanner:
assert len(result) == 0
@patch('requests.get')
@patch("requests.get")
def test_scan_request_exception(self, mock_get):
"""Test handling of request exceptions"""
# Mock exception
@@ -154,7 +160,7 @@ class TestEmailToGravatarScanner:
assert len(result) == 0
@patch('requests.get')
@patch("requests.get")
def test_scan_multiple_emails(self, mock_get):
"""Test scanning multiple emails"""
# Mock successful responses
@@ -173,9 +179,10 @@ class TestEmailToGravatarScanner:
assert all(isinstance(gravatar, Gravatar) for gravatar in result)
assert mock_get.call_count == 3
@patch('requests.get')
@patch("requests.get")
def test_scan_mixed_success_failure(self, mock_get):
"""Test scanning with mixed success and failure"""
# Mock mixed responses - check the actual URL being called
def side_effect(url, *args, **kwargs):
mock_response = Mock()
@@ -203,7 +210,9 @@ class TestEmailToGravatarScanner:
"""Test postprocessing with Neo4j connection"""
# Mock Neo4j connection
mock_neo4j = Mock()
scanner_with_neo4j = EmailToGravatarScanner("sketch_123", "scan_123", neo4j_conn=mock_neo4j)
scanner_with_neo4j = EmailToGravatarScanner(
"sketch_123", "scan_123", neo4j_conn=mock_neo4j
)
gravatars = [
Gravatar(src="https://www.gravatar.com/avatar/hash1", hash="hash1"),
@@ -218,7 +227,7 @@ class TestEmailToGravatarScanner:
# Verify Neo4j queries were executed
assert mock_neo4j.query.call_count == 2
# Check that results are returned unchanged
assert result == gravatars
@@ -261,7 +270,7 @@ class TestEmailToGravatarScanner:
def test_execute_full_workflow(self):
"""Test the complete execute workflow"""
with patch('requests.get') as mock_get:
with patch("requests.get") as mock_get:
# Mock successful response
mock_response = Mock()
mock_response.status_code = 200
@@ -272,18 +281,20 @@ class TestEmailToGravatarScanner:
assert len(result) == 1
assert isinstance(result[0], Gravatar)
assert result[0].hash == hashlib.md5("test@example.com".encode()).hexdigest()
assert (
result[0].hash == hashlib.md5("test@example.com".encode()).hexdigest()
)
def test_execute_with_invalid_input(self):
"""Test execute with invalid input"""
emails = ["not-an-email", "also-invalid"]
with patch('requests.get') as mock_get:
with patch("requests.get") as mock_get:
# Mock successful response for any request
mock_response = Mock()
mock_response.status_code = 200
mock_get.return_value = mock_response
result = scanner.execute(emails)
# The scanner processes any string as an email, so it will create Email objects
@@ -295,8 +306,8 @@ class TestEmailToGravatarScanner:
"""Test that gravatar hash is calculated correctly"""
email = "test@example.com"
expected_hash = hashlib.md5(email.encode()).hexdigest()
with patch('requests.get') as mock_get:
with patch("requests.get") as mock_get:
mock_response = Mock()
mock_response.status_code = 200
mock_get.return_value = mock_response
@@ -312,8 +323,8 @@ class TestEmailToGravatarScanner:
email = "test@example.com"
expected_hash = hashlib.md5(email.encode()).hexdigest()
expected_url = f"https://www.gravatar.com/avatar/{expected_hash}"
with patch('requests.get') as mock_get:
with patch("requests.get") as mock_get:
mock_response = Mock()
mock_response.status_code = 200
mock_get.return_value = mock_response
@@ -322,4 +333,4 @@ class TestEmailToGravatarScanner:
result = scanner.scan(emails)
assert len(result) == 1
assert str(result[0].src) == expected_url
assert str(result[0].src) == expected_url

View File

@@ -6,15 +6,19 @@ from flowsint_types.breach import Breach
scanner = EmailToBreachesScanner("sketch_123", "scan_123")
def test_scanner_name():
assert EmailToBreachesScanner.name() == "to_leaks"
def test_scanner_category():
assert EmailToBreachesScanner.category() == "Email"
def test_scanner_key():
assert EmailToBreachesScanner.key() == "email"
def test_preprocess_string_emails():
emails = [
"test@example.com",
@@ -24,6 +28,7 @@ def test_preprocess_string_emails():
expected_emails = [Email(email=email) for email in emails]
assert result == expected_emails
def test_preprocess_dict_emails():
emails = [
{"email": "test@example.com"},
@@ -33,6 +38,7 @@ def test_preprocess_dict_emails():
expected_emails = [Email(email=email["email"]) for email in emails]
assert result == expected_emails
def test_preprocess_email_objects():
emails = [
Email(email="test@example.com"),
@@ -41,6 +47,7 @@ def test_preprocess_email_objects():
result = scanner.preprocess(emails)
assert result == emails
def test_preprocess_mixed_formats():
emails = [
"test@example.com",
@@ -49,27 +56,28 @@ def test_preprocess_mixed_formats():
{"invalid_key": "should_be_ignored@test.com"},
]
result = scanner.preprocess(emails)
result_emails = [email.email for email in result]
assert "test@example.com" in result_emails
assert "user@domain.org" in result_emails
assert "admin@company.com" in result_emails
assert "should_be_ignored@test.com" not in result_emails
@patch('src.transforms.emails.to_leaks.requests.get')
@patch("src.transforms.emails.to_leaks.requests.get")
def test_scan_successful_response(mock_get):
# Mock successful API response
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = [
{"Name": "TestBreach", "Title": "Test Breach", "Domain": "test.com"},
{"Name": "AnotherBreach", "Title": "Another Breach", "Domain": "another.com"}
{"Name": "AnotherBreach", "Title": "Another Breach", "Domain": "another.com"},
]
mock_get.return_value = mock_response
emails = [Email(email="test@example.com")]
result = scanner.scan(emails)
assert len(result) == 2
assert isinstance(result[0], Breach)
assert isinstance(result[1], Breach)
@@ -78,85 +86,90 @@ def test_scan_successful_response(mock_get):
assert result[0].breach["name"] == "testbreach"
assert result[1].breach["name"] == "anotherbreach"
@patch('src.transforms.emails.to_leaks.requests.get')
@patch("src.transforms.emails.to_leaks.requests.get")
def test_scan_no_breaches_found(mock_get):
# Mock 404 response (no breaches found)
mock_response = MagicMock()
mock_response.status_code = 404
mock_get.return_value = mock_response
emails = [Email(email="test@example.com")]
result = scanner.scan(emails)
assert len(result) == 0
@patch('src.transforms.emails.to_leaks.requests.get')
@patch("src.transforms.emails.to_leaks.requests.get")
def test_scan_api_error(mock_get):
# Mock API error
mock_get.side_effect = Exception("API Error")
emails = [Email(email="test@example.com")]
result = scanner.scan(emails)
assert len(result) == 0
@patch('src.transforms.emails.to_leaks.requests.get')
@patch("src.transforms.emails.to_leaks.requests.get")
def test_scan_missing_name_field(mock_get):
# Mock API response with missing "Name" field
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = [
{"Title": "Test Breach", "Domain": "test.com"}, # Missing "Name" field
{"Name": "ValidBreach", "Title": "Valid Breach", "Domain": "valid.com"}
{"Name": "ValidBreach", "Title": "Valid Breach", "Domain": "valid.com"},
]
mock_get.return_value = mock_response
emails = [Email(email="test@example.com")]
result = scanner.scan(emails)
assert len(result) == 2
assert result[0].name == "unknown" # Should default to "unknown"
assert result[1].name == "validbreach" # Should use the provided name
assert result[0].breach["title"] == "Test Breach"
assert result[1].breach["name"] == "validbreach"
@patch('src.transforms.emails.to_leaks.HIBP_API_KEY', None)
@patch("src.transforms.emails.to_leaks.HIBP_API_KEY", None)
def test_scan_no_api_key():
"""Test that scanner raises ValueError when HIBP_API_KEY is not set."""
emails = [Email(email="test@example.com")]
with pytest.raises(ValueError, match="HIBP_API_KEY not set"):
scanner.scan(emails)
def test_postprocess():
# Test postprocess method with mocked neo4j connection
scanner.neo4j_conn = MagicMock()
# Create breach objects with the new structure
breach1 = Breach(
name="testbreach",
title="Test Breach",
domain="test.com",
pwncount=1000,
breach={"name": "testbreach", "title": "Test Breach"}
breach={"name": "testbreach", "title": "Test Breach"},
)
breach2 = Breach(
name="anotherbreach",
name="anotherbreach",
title="Another Breach",
domain="another.com",
pwncount=2000,
breach={"name": "anotherbreach", "title": "Another Breach"}
breach={"name": "anotherbreach", "title": "Another Breach"},
)
breaches = [breach1, breach2]
original_input = [Email(email="test@example.com")]
result = scanner.postprocess(breaches, original_input)
assert result == breaches
# Verify that neo4j queries were called:
# - 2 breach node creation queries
# - 1 email node creation query
# - 1 email node creation query
# - 2 relationship creation queries
# Total: 5 queries
assert scanner.neo4j_conn.query.call_count == 5
assert scanner.neo4j_conn.query.call_count == 5

View File

@@ -3,18 +3,20 @@ from flowsint_types.asn import ASN
scanner = AsnToCidrsScanner("sketch_123", "scan_123")
def test_preprocess_valid_asns():
asns = [
ASN(number=15169),
ASN(number=13335),
]
result = scanner.preprocess(asns)
result_numbers = [asn.number for asn in result]
expected_numbers = [asn.number for asn in asns]
assert result_numbers == expected_numbers
def test_unprocessed_valid_asns():
asns = [
"15169",
@@ -23,8 +25,9 @@ def test_unprocessed_valid_asns():
result = scanner.preprocess(asns)
result_asns = [asn for asn in result]
expected_asns = [ASN(number=int(asn)) for asn in asns]
assert result_asns == expected_asns
assert result_asns == expected_asns
def test_preprocess_invalid_asns():
asns = [
ASN(number=15169),
@@ -38,6 +41,7 @@ def test_preprocess_invalid_asns():
assert 13335 in result_numbers
assert 999999999999 not in result_numbers
def test_preprocess_multiple_formats():
asns = [
{"number": 15169},
@@ -50,19 +54,23 @@ def test_preprocess_multiple_formats():
result_numbers = [asn.number for asn in result]
assert 15169 in result_numbers
assert 13335 in result_numbers
assert "invalid_key" not in result_numbers # Should be filtered out due to invalid key
assert (
"invalid_key" not in result_numbers
) # Should be filtered out due to invalid key
def test_schemas():
input_schema = scanner.input_schema()
output_schema = scanner.output_schema()
# Input schema should have number field
assert "properties" in input_schema
number_prop = next((prop for prop in input_schema["properties"] if prop["name"] == "number"), None)
number_prop = next(
(prop for prop in input_schema["properties"] if prop["name"] == "number"), None
)
assert number_prop is not None
assert number_prop["type"] == "integer"
# Output schema should have network field
assert "properties" in output_schema
prop_names = [prop["name"] for prop in output_schema["properties"]]

View File

@@ -6,18 +6,20 @@ from tests.logger import TestLogger
logger = TestLogger()
scanner = CidrToIpsScanner("sketch_123", "scan_123", logger)
def test_preprocess_valid_cidrs():
cidrs = [
CIDR(network="8.8.8.0/24"),
CIDR(network="1.1.1.0/24"),
]
result = scanner.preprocess(cidrs)
result_networks = [cidr.network for cidr in result]
expected_networks = [cidr.network for cidr in cidrs]
assert result_networks == expected_networks
def test_preprocess_unprocessed_valid_cidrs():
cidrs = [
"8.8.8.0/24",
@@ -28,6 +30,7 @@ def test_preprocess_unprocessed_valid_cidrs():
expected_cidrs = [CIDR(network=c) for c in cidrs]
assert result_cidrs == expected_cidrs
def test_preprocess_invalid_cidrs():
cidrs = [
CIDR(network="8.8.8.0/24"),
@@ -40,6 +43,7 @@ def test_preprocess_invalid_cidrs():
assert "invalid-cidr" not in result_networks
assert "not-a-cidr" not in result_networks
def test_preprocess_multiple_formats():
cidrs = [
{"network": "8.8.8.0/24"},
@@ -54,6 +58,7 @@ def test_preprocess_multiple_formats():
assert "1.1.1.0/24" not in result_networks
assert "InvalidCIDR" not in result_networks
def test_scan_extracts_ips(monkeypatch):
mock_dnsx_output = """8.35.200.12
8.35.200.112
@@ -75,21 +80,17 @@ def test_scan_extracts_ips(monkeypatch):
input_data = [CIDR(network="8.35.200.0/24")]
ips = scanner.scan(input_data)
assert isinstance(ips, list)
assert len(ips) == 4
expected_ips = [
"8.35.200.12",
"8.35.200.112",
"8.35.200.16",
"8.35.200.170"
]
expected_ips = ["8.35.200.12", "8.35.200.112", "8.35.200.16", "8.35.200.170"]
for ip in ips:
assert isinstance(ip, Ip)
assert ip.address in expected_ips
def test_scan_handles_empty_output(monkeypatch):
class MockSubprocessResult:
def __init__(self):
@@ -103,10 +104,11 @@ def test_scan_handles_empty_output(monkeypatch):
input_data = [CIDR(network="8.8.8.0/24")]
ips = scanner.scan(input_data)
assert isinstance(ips, list)
assert len(ips) == 0
def test_scan_handles_subprocess_exception(monkeypatch):
def mock_subprocess_run(cmd, shell, capture_output, text, timeout):
raise Exception("Subprocess failed")
@@ -115,6 +117,6 @@ def test_scan_handles_subprocess_exception(monkeypatch):
input_data = [CIDR(network="8.8.8.0/24")]
ips = scanner.scan(input_data)
assert isinstance(ips, list)
assert len(ips) == 0
assert len(ips) == 0

View File

@@ -3,6 +3,7 @@ from flowsint_types.ip import Ip, Ip
scanner = GeolocationScanner("sketch_123", "scan_123")
def test_preprocess_valid_ips():
ips = [
Ip(address="8.8.8.8"),
@@ -13,6 +14,7 @@ def test_preprocess_valid_ips():
expected_ips = [d.address for d in ips]
assert result_ips == expected_ips
def test_preprocess_string_ips():
ips = [
"8.8.8.8",
@@ -23,6 +25,7 @@ def test_preprocess_string_ips():
expected_ips = [d for d in ips]
assert [ip.address for ip in result] == expected_ips
def test_preprocess_invalid_ips():
ips = [
Ip(address="8.8.8.8"),
@@ -35,6 +38,7 @@ def test_preprocess_invalid_ips():
assert "1.1.1.1" in result_ips
assert "invalid_ip" not in result_ips
def test_preprocess_multiple_formats():
ips = [
{"address": "8.8.8.8"},
@@ -48,6 +52,7 @@ def test_preprocess_multiple_formats():
assert "1.1.1.1" in result_ips
assert "1.2.3.4" not in result_ips
def test_scan_returns_ip(monkeypatch):
# Mock of get_location_data
def mock_get_location_data(address):
@@ -56,7 +61,7 @@ def test_scan_returns_ip(monkeypatch):
"longitude": -122.0838,
"country": "US",
"city": "Mountain View",
"isp": "Google LLC"
"isp": "Google LLC",
}
monkeypatch.setattr(scanner, "get_location_data", mock_get_location_data)
@@ -70,9 +75,29 @@ def test_scan_returns_ip(monkeypatch):
assert output[0].country == "US"
assert output[0].isp == "Google LLC"
def test_schemas():
input_schema = scanner.input_schema()
output_schema = scanner.output_schema()
assert input_schema == {'type': 'Ip', 'properties': [{'name': 'address', 'type': 'string'}, {'name': 'latitude', 'type': 'number | null'}, {'name': 'longitude', 'type': 'number | null'}, {'name': 'country', 'type': 'string | null'}, {'name': 'city', 'type': 'string | null'}, {'name': 'isp', 'type': 'string | null'}]}
assert output_schema == {'type': 'Ip', 'properties': [{'name': 'address', 'type': 'string'}, {'name': 'latitude', 'type': 'number | null'}, {'name': 'longitude', 'type': 'number | null'}, {'name': 'country', 'type': 'string | null'}, {'name': 'city', 'type': 'string | null'}, {'name': 'isp', 'type': 'string | null'}]}
assert input_schema == {
"type": "Ip",
"properties": [
{"name": "address", "type": "string"},
{"name": "latitude", "type": "number | null"},
{"name": "longitude", "type": "number | null"},
{"name": "country", "type": "string | null"},
{"name": "city", "type": "string | null"},
{"name": "isp", "type": "string | null"},
],
}
assert output_schema == {
"type": "Ip",
"properties": [
{"name": "address", "type": "string"},
{"name": "latitude", "type": "number | null"},
{"name": "longitude", "type": "number | null"},
{"name": "country", "type": "string | null"},
{"name": "city", "type": "string | null"},
{"name": "isp", "type": "string | null"},
],
}

View File

@@ -10,18 +10,20 @@ logger = TestLogger()
# The scanner will get a mock logger from conftest.py automatically
scanner = IpToAsnScanner("sketch_123", "scan_123", logger)
def test_preprocess_valid_ips():
ips = [
Ip(address="8.8.8.8"),
Ip(address="1.1.1.1"),
]
result = scanner.preprocess(ips)
result_addresses = [ip.address for ip in result]
expected_addresses = [ip.address for ip in ips]
assert result_addresses == expected_addresses
def test_unprocessed_valid_ips():
ips = [
"8.8.8.8",
@@ -30,8 +32,9 @@ def test_unprocessed_valid_ips():
result = scanner.preprocess(ips)
result_ips = [ip for ip in result]
expected_ips = [Ip(address=ip) for ip in ips]
assert result_ips == expected_ips
assert result_ips == expected_ips
def test_preprocess_invalid_ips():
ips = [
Ip(address="8.8.8.8"),
@@ -45,6 +48,7 @@ def test_preprocess_invalid_ips():
assert "192.168.1.1" in result_addresses
assert "invalid_ip" not in result_addresses
def test_preprocess_multiple_formats():
ips = [
{"address": "8.8.8.8"},
@@ -58,7 +62,9 @@ def test_preprocess_multiple_formats():
assert "8.8.8.8" in result_addresses
assert "192.168.1.1" in result_addresses
assert "10.0.0.1" in result_addresses
assert "1.1.1.1" not in result_addresses # Should be filtered out due to invalid key
assert (
"1.1.1.1" not in result_addresses
) # Should be filtered out due to invalid key
def test_scan_extracts_asn_info(monkeypatch):
@@ -67,7 +73,7 @@ def test_scan_extracts_asn_info(monkeypatch):
"as_number": "AS15169",
"as_name": "GOOGLE",
"as_country": "US",
"as_range": ["8.8.8.0/24", "8.8.4.0/24"]
"as_range": ["8.8.8.0/24", "8.8.4.0/24"],
}
class MockSubprocessResult:
@@ -85,10 +91,10 @@ def test_scan_extracts_asn_info(monkeypatch):
input_data = [Ip(address="8.8.8.8")]
asns = scanner.scan(input_data)
assert isinstance(asns, list)
assert len(asns) == 1
asn = asns[0]
assert isinstance(asn, ASN)
assert asn.number == 15169 # AS15169 -> 15169
@@ -113,10 +119,10 @@ def test_scan_handles_no_asn_found(monkeypatch):
input_data = [Ip(address="192.168.1.1")]
asns = scanner.scan(input_data)
assert isinstance(asns, list)
assert len(asns) == 1
asn = asns[0]
assert isinstance(asn, ASN)
assert asn.number == 0
@@ -133,10 +139,10 @@ def test_scan_handles_subprocess_exception(monkeypatch):
input_data = [Ip(address="8.8.8.8")]
asns = scanner.scan(input_data)
assert isinstance(asns, list)
assert len(asns) == 1
asn = asns[0]
assert isinstance(asn, ASN)
assert asn.number == 0
@@ -151,15 +157,15 @@ def test_scan_multiple_ips(monkeypatch):
"as_number": "AS15169",
"as_name": "GOOGLE",
"as_country": "US",
"as_range": ["8.8.8.0/24"]
"as_range": ["8.8.8.0/24"],
},
"1.1.1.1": {
"input": "1.1.1.1",
"as_number": "AS13335",
"as_name": "CLOUDFLARE",
"as_country": "US",
"as_range": ["1.1.1.0/24"]
}
"as_range": ["1.1.1.0/24"],
},
}
class MockSubprocessResult:
@@ -176,13 +182,13 @@ def test_scan_multiple_ips(monkeypatch):
input_data = [Ip(address="8.8.8.8"), Ip(address="1.1.1.1")]
asns = scanner.scan(input_data)
assert len(asns) == 2
# Check first ASN
assert asns[0].number == 15169
assert asns[0].name == "GOOGLE"
# Check second ASN
assert asns[1].number == 13335
assert asns[1].name == "CLOUDFLARE"
@@ -191,13 +197,15 @@ def test_scan_multiple_ips(monkeypatch):
def test_schemas():
input_schema = scanner.input_schema()
output_schema = scanner.output_schema()
# Input schema should have address field
assert "properties" in input_schema
address_prop = next((prop for prop in input_schema["properties"] if prop["name"] == "address"), None)
address_prop = next(
(prop for prop in input_schema["properties"] if prop["name"] == "address"), None
)
assert address_prop is not None
assert address_prop["type"] == "string"
# Output schema should have ASN fields
assert "properties" in output_schema
prop_names = [prop["name"] for prop in output_schema["properties"]]
@@ -211,20 +219,22 @@ def test_postprocess_creates_neo4j_relationships(monkeypatch):
# Mock Neo4j connection
mock_neo4j = Mock()
scanner.neo4j_conn = mock_neo4j
input_data = [Ip(address="8.8.8.8")]
asn_results = [ASN(
number=15169,
name="GOOGLE",
country="US",
cidrs=[CIDR(network="8.8.8.0/24")]
)]
asn_results = [
ASN(
number=15169,
name="GOOGLE",
country="US",
cidrs=[CIDR(network="8.8.8.0/24")],
)
]
result = scanner.postprocess(asn_results, input_data)
# Verify Neo4j query was called
mock_neo4j.query.assert_called_once()
# Check the query parameters
call_args = mock_neo4j.query.call_args
params = call_args[0][1]
@@ -233,7 +243,7 @@ def test_postprocess_creates_neo4j_relationships(monkeypatch):
assert params["asn_name"] == "GOOGLE"
assert params["asn_country"] == "US"
assert params["sketch_id"] == "sketch_123"
# Should return the same results
assert result == asn_results
@@ -242,19 +252,16 @@ def test_postprocess_skips_unknown_asns(monkeypatch):
# Mock Neo4j connection
mock_neo4j = Mock()
scanner.neo4j_conn = mock_neo4j
input_data = [Ip(address="192.168.1.1")]
asn_results = [ASN(
number=0, # Unknown ASN
name="Unknown",
country="Unknown",
cidrs=[]
)]
asn_results = [
ASN(number=0, name="Unknown", country="Unknown", cidrs=[]) # Unknown ASN
]
result = scanner.postprocess(asn_results, input_data)
# Verify Neo4j query was NOT called for unknown ASN
mock_neo4j.query.assert_not_called()
# Should return the same results
assert result == asn_results
assert result == asn_results

View File

@@ -3,17 +3,15 @@ from flowsint_types.organization import Organization
scanner = OrgToInfosScanner("sketch_123", "scan_123")
def test_preprocess_valid_names():
data = [
Organization(name="OpenAI"),
{"name": "Inria"},
"OVH"
]
data = [Organization(name="OpenAI"), {"name": "Inria"}, "OVH"]
result = scanner.preprocess(data)
result_names = [org.name for org in result]
assert result_names == ["OpenAI", "Inria", "OVH"]
# def test_preprocess_invalid_entries():
# data = [
# {"wrong_key": "value"},
@@ -25,6 +23,7 @@ def test_preprocess_valid_names():
# result = scanner.preprocess(data)
# assert result == []
def test_execute():
scanner.execute(["Karim Terrache"])
assert True
assert True

View File

@@ -1,9 +1,10 @@
from pathlib import Path
from flowsint_transforms.socials.maigret import MaigretScanner
from flowsint_types.social import Social
from flowsint_types.social import SocialProfile
scanner = MaigretScanner("sketch_123", "scan_123")
def test_unprocessed_valid_usernames():
usernames = [
"toto123",
@@ -11,14 +12,15 @@ def test_unprocessed_valid_usernames():
]
result = scanner.preprocess(usernames)
result_usernames = [d for d in result]
expected_usernames = [Social(username=d) for d in usernames]
assert result_usernames == expected_usernames
expected_usernames = [SocialProfile(username=d) for d in usernames]
assert result_usernames == expected_usernames
def test_preprocess_invalid_usernames():
usernames = [
Social(username="toto123"),
Social(username="DorianXd78_Official"),
Social(username="This is not a username"),
SocialProfile(username="toto123"),
SocialProfile(username="DorianXd78_Official"),
SocialProfile(username="This is not a username"),
]
result = scanner.preprocess(usernames)
@@ -27,11 +29,12 @@ def test_preprocess_invalid_usernames():
assert "DorianXd78_Official" in result_usernames
assert "This is not a username" not in result_usernames
def test_preprocess_multiple_formats():
usernames = [
{"username": "toto123"},
{"invalid_key": "ValId_UseRnAme"},
Social(username="DorianXd78_Official"),
SocialProfile(username="DorianXd78_Official"),
"MySimpleUsername",
]
result = scanner.preprocess(usernames)
@@ -41,13 +44,14 @@ def test_preprocess_multiple_formats():
assert "DorianXd78_Official" in result_usernames
assert "ValId_UseRnAme" not in result_usernames
assert "MySimpleUsername" in result_usernames
def test_parsing_invalid_output_file():
results = scanner.parse_maigret_output("toto123", Path("/this/path/does/not/exist"))
assert results == []
def test_parsing():
results = scanner.parse_maigret_output("toto123", Path("/tmp/maigret_test.json"))
print(results)
assert len(results) == 2

View File

@@ -1,5 +1,5 @@
import pytest
from unittest.mock import Mock, patch, call
from unittest.mock import Mock, patch
from flowsint_transforms.websites.to_links import WebsiteToLinks
from flowsint_types.website import Website
@@ -14,10 +14,10 @@ class MockCrawler:
def __init__(self, url, recursive=True, verbose=False, _on_result_callback=None):
self.url = url
self.callback = _on_result_callback
def fetch(self):
pass
def extract_urls(self):
# Simulate callback calls
if self.callback:
@@ -25,11 +25,14 @@ class MockCrawler:
self.callback("https://example.com/page2", is_external=False)
self.callback("https://external.com/page", is_external=True)
self.callback("https://another-external.org/resource", is_external=True)
def get_results(self):
return MockCrawlResults(
internal=["https://example.com/page1", "https://example.com/page2"],
external=["https://external.com/page", "https://another-external.org/resource"]
external=[
"https://external.com/page",
"https://another-external.org/resource",
],
)
@@ -37,102 +40,210 @@ class MockCrawler:
async def test_website_to_links_real_time_neo4j_creation():
"""Test that Neo4j nodes are created in real-time during the callback."""
scanner = WebsiteToLinks(sketch_id="test", scan_id="test")
# Mock neo4j connection and methods
scanner.neo4j_conn = Mock()
scanner.create_node = Mock()
scanner.create_relationship = Mock()
scanner.log_graph_message = Mock()
# Test input
websites = [Website(url="https://example.com")]
with patch('src.transforms.websites.to_links.Crawler', MockCrawler):
with patch("src.transforms.websites.to_links.Crawler", MockCrawler):
results = await scanner.scan(websites)
# Verify main website and domain nodes were created upfront
scanner.create_node.assert_any_call('website', 'url', 'https://example.com',
caption='https://example.com', type='website')
scanner.create_node.assert_any_call('domain', 'name', 'example.com',
caption='example.com', type='domain')
scanner.create_node.assert_any_call(
"website",
"url",
"https://example.com",
caption="https://example.com",
type="website",
)
scanner.create_node.assert_any_call(
"domain", "name", "example.com", caption="example.com", type="domain"
)
# Verify main website to domain relationship
scanner.create_relationship.assert_any_call('website', 'url', 'https://example.com',
'domain', 'name', 'example.com', 'BELONGS_TO_DOMAIN')
scanner.create_relationship.assert_any_call(
"website",
"url",
"https://example.com",
"domain",
"name",
"example.com",
"BELONGS_TO_DOMAIN",
)
# Verify internal website nodes were created in callback
scanner.create_node.assert_any_call('website', 'url', 'https://example.com/page1',
caption='https://example.com/page1', type='website')
scanner.create_node.assert_any_call('website', 'url', 'https://example.com/page2',
caption='https://example.com/page2', type='website')
scanner.create_node.assert_any_call(
"website",
"url",
"https://example.com/page1",
caption="https://example.com/page1",
type="website",
)
scanner.create_node.assert_any_call(
"website",
"url",
"https://example.com/page2",
caption="https://example.com/page2",
type="website",
)
# Verify internal website relationships
scanner.create_relationship.assert_any_call('website', 'url', 'https://example.com',
'website', 'url', 'https://example.com/page1', 'LINKS_TO')
scanner.create_relationship.assert_any_call('website', 'url', 'https://example.com',
'website', 'url', 'https://example.com/page2', 'LINKS_TO')
scanner.create_relationship.assert_any_call(
"website",
"url",
"https://example.com",
"website",
"url",
"https://example.com/page1",
"LINKS_TO",
)
scanner.create_relationship.assert_any_call(
"website",
"url",
"https://example.com",
"website",
"url",
"https://example.com/page2",
"LINKS_TO",
)
# Verify external website nodes were created in callback
scanner.create_node.assert_any_call('website', 'url', 'https://external.com/page',
caption='https://external.com/page', type='website')
scanner.create_node.assert_any_call('website', 'url', 'https://another-external.org/resource',
caption='https://another-external.org/resource', type='website')
scanner.create_node.assert_any_call(
"website",
"url",
"https://external.com/page",
caption="https://external.com/page",
type="website",
)
scanner.create_node.assert_any_call(
"website",
"url",
"https://another-external.org/resource",
caption="https://another-external.org/resource",
type="website",
)
# Verify external domain nodes were created in callback
scanner.create_node.assert_any_call('domain', 'name', 'external.com',
caption='external.com', type='domain')
scanner.create_node.assert_any_call('domain', 'name', 'another-external.org',
caption='another-external.org', type='domain')
scanner.create_node.assert_any_call(
"domain", "name", "external.com", caption="external.com", type="domain"
)
scanner.create_node.assert_any_call(
"domain",
"name",
"another-external.org",
caption="another-external.org",
type="domain",
)
# Verify main website to external website relationships
scanner.create_relationship.assert_any_call('website', 'url', 'https://example.com',
'website', 'url', 'https://external.com/page', 'LINKS_TO')
scanner.create_relationship.assert_any_call('website', 'url', 'https://example.com',
'website', 'url', 'https://another-external.org/resource', 'LINKS_TO')
scanner.create_relationship.assert_any_call(
"website",
"url",
"https://example.com",
"website",
"url",
"https://external.com/page",
"LINKS_TO",
)
scanner.create_relationship.assert_any_call(
"website",
"url",
"https://example.com",
"website",
"url",
"https://another-external.org/resource",
"LINKS_TO",
)
# Verify external website to domain relationships
scanner.create_relationship.assert_any_call('website', 'url', 'https://external.com/page',
'domain', 'name', 'external.com', 'BELONGS_TO_DOMAIN')
scanner.create_relationship.assert_any_call('website', 'url', 'https://another-external.org/resource',
'domain', 'name', 'another-external.org', 'BELONGS_TO_DOMAIN')
scanner.create_relationship.assert_any_call(
"website",
"url",
"https://external.com/page",
"domain",
"name",
"external.com",
"BELONGS_TO_DOMAIN",
)
scanner.create_relationship.assert_any_call(
"website",
"url",
"https://another-external.org/resource",
"domain",
"name",
"another-external.org",
"BELONGS_TO_DOMAIN",
)
# Verify main website to external domain relationships
scanner.create_relationship.assert_any_call('website', 'url', 'https://example.com',
'domain', 'name', 'external.com', 'LINKS_TO_DOMAIN')
scanner.create_relationship.assert_any_call('website', 'url', 'https://example.com',
'domain', 'name', 'another-external.org', 'LINKS_TO_DOMAIN')
scanner.create_relationship.assert_any_call(
"website",
"url",
"https://example.com",
"domain",
"name",
"external.com",
"LINKS_TO_DOMAIN",
)
scanner.create_relationship.assert_any_call(
"website",
"url",
"https://example.com",
"domain",
"name",
"another-external.org",
"LINKS_TO_DOMAIN",
)
@pytest.mark.asyncio
async def test_website_to_links_error_handling_with_neo4j():
"""Test that main nodes are still created even when crawling fails."""
scanner = WebsiteToLinks(sketch_id="test", scan_id="test")
# Mock neo4j connection and methods
scanner.neo4j_conn = Mock()
scanner.create_node = Mock()
scanner.create_relationship = Mock()
scanner.log_graph_message = Mock()
# Mock crawler that raises an exception
def mock_crawler_error(*args, **kwargs):
raise Exception("Test error")
websites = [Website(url="https://example.com")]
with patch('src.transforms.websites.to_links.Crawler', mock_crawler_error):
with patch("src.transforms.websites.to_links.Crawler", mock_crawler_error):
results = await scanner.scan(websites)
# Verify main website and domain nodes were still created despite error
scanner.create_node.assert_any_call('website', 'url', 'https://example.com',
caption='https://example.com', type='website')
scanner.create_node.assert_any_call('domain', 'name', 'example.com',
caption='example.com', type='domain')
scanner.create_node.assert_any_call(
"website",
"url",
"https://example.com",
caption="https://example.com",
type="website",
)
scanner.create_node.assert_any_call(
"domain", "name", "example.com", caption="example.com", type="domain"
)
# Verify main website to domain relationship was created
scanner.create_relationship.assert_any_call('website', 'url', 'https://example.com',
'domain', 'name', 'example.com', 'BELONGS_TO_DOMAIN')
scanner.create_relationship.assert_any_call(
"website",
"url",
"https://example.com",
"domain",
"name",
"example.com",
"BELONGS_TO_DOMAIN",
)
# Verify result structure
assert len(results) == 1
result = results[0]
@@ -146,17 +257,19 @@ async def test_website_to_links_error_handling_with_neo4j():
def test_postprocess_simplified():
"""Test that postprocess now just returns results as-is."""
scanner = WebsiteToLinks(sketch_id="test", scan_id="test")
original_input = [Website(url="https://example.com")]
results = [{
"website": "https://example.com",
"main_domain": "example.com",
"internal_urls": ["https://example.com/page1"],
"external_urls": ["https://external.com/page"],
"external_domains": ["external.com"]
}]
results = [
{
"website": "https://example.com",
"main_domain": "example.com",
"internal_urls": ["https://example.com/page1"],
"external_urls": ["https://external.com/page"],
"external_domains": ["external.com"],
}
]
processed_results = scanner.postprocess(results, original_input)
# Should just return the same results since Neo4j work is done in real-time
assert processed_results == results
assert processed_results == results

View File

@@ -1,42 +1,52 @@
import re
from typing import Dict
from app.tools.network.asnmap import AsnmapTool
from tools.network.asnmap import AsnmapTool
tool = AsnmapTool()
def test_name():
assert tool.name() == "asnmap"
def test_description():
assert tool.description() == "ASN mapping and network reconnaissance tool."
def test_category():
assert tool.category() == "ASN discovery"
def test_image():
assert tool.get_image() == "projectdiscovery/asnmap"
def test_install():
tool.install()
assert tool.is_installed() == True
def test_version():
tool.install()
version = tool.version()
# Check that version follows the expected format: v followed by digits and dots
assert re.match(r'^v[\d\.]+$', version)
assert re.match(r"^v[\d\.]+$", version)
def test_launch_no_api_key():
import pytest
with pytest.raises(KeyError, match="Missing key"):
tool.launch("alliage.io", 'domain')
tool.launch("alliage.io", "domain")
def test_launch_wrong_type():
import pytest
with pytest.raises(ValueError, match="Invalid type: 'domains'"):
tool.launch("alliage.io", 'domains')
tool.launch("alliage.io", "domains")
def test_launch():
results = tool.launch("alliage.io", 'domain')
results = tool.launch("alliage.io", "domain")
assert isinstance(results, Dict)

View File

@@ -1,40 +1,50 @@
import re
from typing import List
from app.tools.network.httpx import HttpxTool
from tools.network.httpx import HttpxTool
tool = HttpxTool()
def test_name():
assert tool.name() == "httpx"
def test_description():
assert tool.description() == "An HTTP toolkit that probes services, web servers, and other valuable metadata."
assert (
tool.description()
== "An HTTP toolkit that probes services, web servers, and other valuable metadata."
)
def test_category():
assert tool.category() == "Web technologies enumeration"
def test_image():
assert tool.get_image() == "projectdiscovery/httpx"
def test_install():
tool.install()
assert tool.is_installed() == True
def test_version():
tool.install()
version = tool.version()
# Check that version follows the expected format: v followed by digits and dots
assert re.match(r'^v[\d\.]+$', version)
assert re.match(r"^v[\d\.]+$", version)
def test_launch():
assert True
results = tool.launch("https://alliage.io")
print(results)
assert isinstance(results, List)
def test_launch_unreached_host():
assert True
results = tool.launch("https://this-is-not-a-valid-domain.local")
assert isinstance(results, List)
assert len(results) == 0

View File

@@ -1,20 +1,23 @@
import re
from typing import Dict
from app.tools.network.reconcrawl import ReconCrawlTool
from tools.network.reconcrawl import ReconCrawlTool
tool = ReconCrawlTool()
def test_name():
assert tool.name() == "reconcrawl"
def test_description():
assert tool.description() == "Emails and phone numbers crawler from websites by analyzing their HTML and embedded scripts."
assert (
tool.description()
== "Emails and phone numbers crawler from websites by analyzing their HTML and embedded scripts."
)
def test_category():
assert tool.category() == "Crawler"
def test_install():
tool.install()
assert tool.is_installed() == True

View File

@@ -1,32 +1,38 @@
import re
from app.tools.network.subfinder import SubfinderTool
from tools.network.subfinder import SubfinderTool
tool = SubfinderTool()
def test_name():
assert tool.name() == "subfinder"
def test_description():
assert tool.description() == "Fast passive subdomain enumeration tool."
def test_category():
assert tool.category() == "Subdomain enumeration"
def test_image():
assert tool.get_image() == "projectdiscovery/subfinder"
def test_install():
tool.install()
assert tool.is_installed() == True
def test_version():
tool.install()
version = tool.version()
# Check that version follows the expected format: v followed by digits and dots
assert re.match(r'^v[\d\.]+$', version)
assert re.match(r"^v[\d\.]+$", version)
def test_launch():
results = tool.launch("alliage.io")
assert isinstance(results, list)
assert all(isinstance(item, str) for item in results)

View File

@@ -1,28 +1,36 @@
import re
from typing import Dict
from app.tools.organizations.sirene import SireneTool
from tools.organizations.sirene import SireneTool
tool = SireneTool()
def test_name():
assert tool.name() == "sirene"
def test_description():
assert tool.description() == "The Sirene API allows you to query the Sirene directory of businesses and establishments, managed by Insee."
assert (
tool.description()
== "The Sirene API allows you to query the Sirene directory of businesses and establishments, managed by Insee."
)
def test_category():
assert tool.category() == "Business intelligence"
def test_launch_org():
results = tool.launch("blablacar", 1)
assert isinstance(results, list)
assert all(isinstance(item, Dict) for item in results)
def test_launch_person():
results = tool.launch("Karim+Terrache", 1)
assert isinstance(results, list)
assert all(isinstance(item, Dict) for item in results)
def test_launch_person_space_format():
results = tool.launch("Karim Terrache", 1)
assert isinstance(results, list)

View File

@@ -1,41 +1,40 @@
from flowsint_core.core.scanner_base import build_params_model
def test_build_params_model_valid():
param_schema = [
{
"name": "ETHERSCAN_API_KEY",
"type": "string",
"description": "The Etherscan API key to use for the transaction lookup.",
"required": True
},
{
"name": "url",
"type": "string",
"description": "Base URL for API",
"required": False,
"default": "https://api.etherscan.io/api"
}
]
{
"name": "ETHERSCAN_API_KEY",
"type": "string",
"description": "The Etherscan API key to use for the transaction lookup.",
"required": True,
},
{
"name": "url",
"type": "string",
"description": "Base URL for API",
"required": False,
"default": "https://api.etherscan.io/api",
},
]
ParamsModel = build_params_model(param_schema)
validated_params = ParamsModel(ETHERSCAN_API_KEY="clef-123")
assert validated_params.ETHERSCAN_API_KEY == "clef-123"
assert validated_params.url == "https://api.etherscan.io/api"
def test_build_params_model_invalid():
param_schema = [
{
},
{
"name": "url",
"type": "string",
"description": "Base URL for API",
"required": False,
"default": "https://api.etherscan.io/api"
}
]
{},
{
"name": "url",
"type": "string",
"description": "Base URL for API",
"required": False,
"default": "https://api.etherscan.io/api",
},
]
ParamsModel = build_params_model(param_schema)
validated_params = ParamsModel(ETHERSCAN_API_KEY="clef-123")
assert validated_params.ETHERSCAN_API_KEY == "clef-123"
assert validated_params.url == "https://api.etherscan.io/api"