Add on the fly Sigma YAML rule conversion with pySigma

Add conditional imports to limit error for functions not used
Add option groups to improve help readability
Correct typo in docs
Update some error messages
Bump version to 2.20.0
This commit is contained in:
wagga40
2024-01-02 15:03:51 +01:00
parent 3d2a94f568
commit 7fe72c6305
6 changed files with 830 additions and 260 deletions

View File

@@ -3,7 +3,7 @@
DOCKER?=docker
DOCKER_BUILD_FLAGS?=
DOCKER_REGISTRY?=docker.io
DOCKER_TAG?=2.10
DOCKER_TAG?=2.20.0
GIT?=git
PY3?=python3
DATE=$(shell date +%s)

View File

@@ -27,7 +27,7 @@ git clone https://github.com/sbousseaden/EVTX-ATTACK-SAMPLES.git
### Installation from repository
#### Using [*venv*](https://packaging.python.org/en/latest/guides/installing-using-pip-and-virtual-environments/) on Linux/MacOS
#### Using [venv](https://packaging.python.org/en/latest/guides/installing-using-pip-and-virtual-environments/) on Linux/MacOS
**Requirements** : Python 3 venv
@@ -45,7 +45,7 @@ python3 zircolite.py -e EVTX-ATTACK-SAMPLES/ -r rules/rules_windows_sysmon_pysig
deactivate # Quit Python3 venv
```
#### Using [*Pdm*](https://pdm-project.org/latest/) or [Poetry](https://python-poetry.org)
#### Using [Pdm](https://pdm-project.org/latest/) or [Poetry](https://python-poetry.org)
```shell
# INSTALL
@@ -364,14 +364,14 @@ Default rulesets are already provided in the `rules` directory. These rulesets o
### Generate rulesets using PySigma
#### Using [*Pdm*](https://pdm-project.org/latest/) or [Poetry](https://python-poetry.org)
#### Using [Pdm](https://pdm-project.org/latest/) or [Poetry](https://python-poetry.org)
```shell
# INSTALL
git clone https://github.com/SigmaHQ/sigma.git
cd sigma
pdm init -n
pdm add pysigma sigma-cli pysigma-pipeline-sysmon pysigma-pipeline-windows pysigma-backend-sqlite
pdm add pysigma pip sigma-cli pysigma-pipeline-sysmon pysigma-pipeline-windows pysigma-backend-sqlite
# GENERATE RULESET (SYSMON)
pdm run sigma convert -t sqlite -f zircolite -p sysmon -p windows-logsources sigma/rules/windows/ -s -o rules.json

Binary file not shown.

BIN
pics/Zircolite.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 126 KiB

View File

@@ -4,6 +4,7 @@
import argparse
import asyncio
import csv
import functools
import hashlib
import logging
import multiprocessing as mp
@@ -22,21 +23,67 @@ from pathlib import Path
from sqlite3 import Error
from sys import platform as _platform
# External libs
import aiohttp
# External libs (Mandatory)
import orjson as json
import requests
import urllib3
import xxhash
from colorama import Fore
from elasticsearch import AsyncElasticsearch
from evtx import PyEvtxParser
from jinja2 import Template
from lxml import etree
from tqdm import tqdm
from tqdm.asyncio import tqdm as tqdmAsync
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# External libs (Optional)
forwardingDisabled = False
try:
import aiohttp
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
except ImportError:
forwardingDisabled = True
elasticForwardingDisabled = False
try:
from elasticsearch import AsyncElasticsearch
except ImportError:
elasticForwardingDisabled = True
updateDisabled = False
try:
import requests
except ImportError:
forwardingDisabled = True
updateDisabled = True
sigmaConversionDisabled = False
try:
from sigma.collection import SigmaCollection
from sigma.backends.sqlite import sqlite
from sigma.pipelines.sysmon import sysmon_pipeline
from sigma.pipelines.windows import (
windows_logsource_pipeline,
windows_audit_pipeline,
)
from sigma.processing.resolver import ProcessingPipelineResolver
import yaml
except ImportError:
sigmaConversionDisabled = True
pyevtxDisabled = False
try:
from evtx import PyEvtxParser
except ImportError:
pyevtxDisabled = True
jinja2Disabled = False
try:
from jinja2 import Template
except ImportError:
jinja2Disabled = True
xmlImportDisabled = False
try:
from lxml import etree
except ImportError:
xmlImportDisabled = True
def signal_handler(sig, frame):
@@ -909,7 +956,7 @@ class zirCore:
self.applyRulesetFilters(ruleFilters)
except Exception as e:
self.logger.error(
f"{Fore.RED} [-] Load JSON ruleset failed, are you sure it is a valid JSON file ? : {e}{Fore.RESET}"
f"{Fore.RED} [-] Loading JSON ruleset failed, are you sure it is a valid JSON file ? : {e}{Fore.RESET}"
)
def loadRulesetFromVar(self, ruleset, ruleFilters):
@@ -1103,11 +1150,6 @@ class evtxExtractor:
for _ in range(8)
)
def makeExecutable(self, path):
mode = os.stat(path).st_mode
mode |= (mode & 0o444) >> 2
os.chmod(path, mode)
def getOSExternalTools(self, binPath):
"""Determine which binaries to run depending on host OS : 32Bits is NOT supported for now since evtx_dump is 64bits only"""
if binPath is None:
@@ -1125,21 +1167,28 @@ class evtxExtractor:
Convert EVTX to JSON using evtx_dump bindings (slower)
Drop resulting JSON files in a tmp folder.
"""
try:
filepath = Path(file)
filename = filepath.name
parser = PyEvtxParser(str(filepath))
with open(
f"{self.tmpDir}/{str(filename)}-{self.randString()}.json",
"w",
encoding="utf-8",
) as f:
for record in parser.records_json():
f.write(
f'{json.dumps(json.loads(record["data"])).decode("utf-8")}\n'
)
except Exception as e:
self.logger.error(f"{Fore.RED} [-] {e}{Fore.RESET}")
if not self.useExternalBinaries:
try:
filepath = Path(file)
filename = filepath.name
parser = PyEvtxParser(str(filepath))
with open(
f"{self.tmpDir}/{str(filename)}-{self.randString()}.json",
"w",
encoding="utf-8",
) as f:
for record in parser.records_json():
f.write(
f'{json.dumps(json.loads(record["data"])).decode("utf-8")}\n'
)
except Exception as e:
self.logger.error(
f"{Fore.RED} [-] Cannot use PyEvtxParser{Fore.RESET}"
)
else:
self.logger.error(
f"{Fore.RED} [-] Cannot use PyEvtxParser and evtx_dump is disabled or missing{Fore.RESET}"
)
def getTime(self, line):
timestamp = line.replace("msg=audit(", "").replace("):", "").split(":")
@@ -1517,6 +1566,181 @@ class rulesUpdater:
self.logger.error(f" [-] {e}")
class rulesetHandler:
def __init__(self, logger=None, config=None):
self.logger = logger or logging.getLogger(__name__)
self.saveRuleset = config.save_ruleset
self.rulesetPathList = config.ruleset
self.cores = config.cores or os.cpu_count()
self.sigmaConversionDisabled = config.no_sigma_conversion
if self.sigmaConversionDisabled:
self.logger.info(
f"{Fore.LIGHTYELLOW_EX} [i] SIGMA conversion is disabled ! {Fore.RESET}"
)
# Rsolving pipelines
if config.pipeline_sysmon:
self.pipelines = [sysmon_pipeline(), windows_logsource_pipeline()]
elif config.pipeline_windows:
self.pipelines = [windows_audit_pipeline(), windows_logsource_pipeline()]
else:
self.pipelines = []
# Parse & (if necessary) convert ruleset, final list is stored in self.Rulesets
self.Rulesets = self.rulesetParsing()
# Combining Rulesets
if config.combine_ruleset:
self.Rulesets = [
item
for subRuleset in self.Rulesets
if subRuleset
for item in subRuleset
]
self.Rulesets = [
sorted(self.Rulesets, key=lambda d: d["level"])
] # Sorting by level
if all(not subRuleset for subRuleset in self.Rulesets):
self.logger.error(f"{Fore.RED} [-] No rules to execute !{Fore.RESET}")
def isYAML(self, filepath):
"""Test if the file is a YAML file"""
if filepath.suffix == ".yml" or filepath.suffix == ".yaml":
with open(filepath, "r") as file:
content = file.read()
try:
yaml.safe_load(content)
return True
except yaml.YAMLError:
return False
def isJSON(self, filepath):
"""Test if the file is a JSON file"""
if filepath.suffix == ".json":
with open(filepath, "r") as file:
content = file.read()
try:
json.loads(content)
return True
except json.JSONDecodeError:
return False
def randRulesetName(self, sigmaRules):
# Clean the ruleset name
cleanedName = "".join(
char if char.isalnum() else "-" for char in sigmaRules
).strip("-")
cleanedName = re.sub(r"-+", "-", cleanedName)
# Generate a random string
randomString = "".join(
random.SystemRandom().choice(string.ascii_uppercase + string.digits)
for _ in range(8)
)
return f"ruleset-{cleanedName}-{randomString}.json"
def convertSigmaRules(self, backend, rule):
try:
return backend.convert_rule(rule, "zircolite")[0]
except Exception as e:
self.logger.debug(
f"{Fore.RED} [-] Cannot convert rule '{str(rule)}' : {e}{Fore.RESET}"
)
def sigmaRulesToRuleset(self, SigmaRulesList, pipelines):
for sigmaRules in SigmaRulesList:
# Create the pipeline resolver
piperesolver = ProcessingPipelineResolver()
# Add pipelines
for pipeline in pipelines:
piperesolver.add_pipeline_class(pipeline) # Sysmon handling
# Create a single sorted and prioritzed pipeline
combined_pipeline = piperesolver.resolve(piperesolver.pipelines)
# Instantiate backend, using our resolved pipeline
sqlite_backend = sqlite.sqliteBackend(combined_pipeline)
rules = Path(sigmaRules)
if rules.is_dir():
rule_list = list(rules.rglob("*.yml")) + list(rules.rglob("*.yaml"))
else:
rule_list = [rules]
rule_collection = SigmaCollection.load_ruleset(rule_list)
ruleset = []
pool = mp.Pool(self.cores)
ruleset = pool.map(
functools.partial(self.convertSigmaRules, sqlite_backend),
tqdm(rule_collection, colour="yellow"),
)
pool.close()
pool.join()
ruleset = [
rule for rule in ruleset if rule is not None
] # Removing empty results
ruleset = sorted(ruleset, key=lambda d: d["level"]) # Sorting by level
if self.saveRuleset:
with open(self.randRulesetName(str(sigmaRules)), "w") as outfile:
outfile.write(
json.dumps(ruleset, option=json.OPT_INDENT_2).decode("utf-8")
)
return ruleset
def rulesetParsing(self):
rulesetList = []
for ruleset in self.rulesetPathList:
rulesetPath = Path(ruleset)
if rulesetPath.exists():
if rulesetPath.is_file():
if self.isJSON(rulesetPath): # JSON Ruleset
try:
with open(rulesetPath, encoding="utf-8") as f:
rulesetList.append(json.loads(f.read()))
self.logger.info(
f"{Fore.CYAN} [+] Loaded JSON/Zircolite ruleset : {str(rulesetPath)}{Fore.RESET}"
)
except Exception as e:
self.logger.error(
f"{Fore.RED} [-] Cannot load {str(rulesetPath)} {e}{Fore.RESET}"
)
else: # YAML Ruleset
if (
self.isYAML(rulesetPath)
and not self.sigmaConversionDisabled
):
try:
self.logger.info(
f"{Fore.CYAN} [+] Converting Native SIGMA to Zircolite ruleset : {str(rulesetPath)}{Fore.RESET}"
)
rulesetList.append(
self.sigmaRulesToRuleset(
[rulesetPath], self.pipelines
)
)
except Exception as e:
self.logger.error(
f"{Fore.RED} [-] Cannot convert {str(rulesetPath)} {e}{Fore.RESET}"
)
elif (
rulesetPath.is_dir() and not self.sigmaConversionDisabled
): # Directory
try:
self.logger.info(
f"{Fore.CYAN} [+] Converting Native SIGMA to Zircolite ruleset : {str(rulesetPath)}{Fore.RESET}"
)
rulesetList.append(
self.sigmaRulesToRuleset([rulesetPath], self.pipelines)
)
except Exception as e:
self.logger.error(
f"{Fore.RED} [-] Cannot convert {str(rulesetPath)} {e}{Fore.RESET}"
)
return rulesetList
def selectFiles(pathList, selectFilesList):
if selectFilesList is not None:
return [
@@ -1543,60 +1767,128 @@ def avoidFiles(pathList, avoidFilesList):
return pathList
def ImportErrorHandler(config):
importErrorList = []
if forwardingDisabled:
importErrorList.append(
f"{Fore.LIGHTYELLOW_EX} [i] Cannot import 'aiohttp' or 'urllib3' or 'requests', events forwarding is disabled{Fore.RESET}"
)
config.remote = None
if elasticForwardingDisabled:
importErrorList.append(
f"{Fore.LIGHTYELLOW_EX} [i] Cannot import 'elasticsearch[async]', events forwarding to Elastic is disabled{Fore.RESET}"
)
config.index = None
if updateDisabled:
importErrorList.append(
f"{Fore.LIGHTYELLOW_EX} [i] Cannot import 'requests', events update is disabled{Fore.RESET}"
)
config.update_rules = False
if sigmaConversionDisabled:
importErrorList.append(
f"{Fore.LIGHTYELLOW_EX} [i] Cannot import 'sigma' from pySigma, ruleset conversion YAML -> JSON is disabled{Fore.RESET}"
)
config.no_sigma_conversion = True
if pyevtxDisabled:
importErrorList.append(
f"{Fore.LIGHTYELLOW_EX} [i] Cannot import 'evtx' from pyevtx-rs, use of external binaries is mandatory{Fore.RESET}"
)
config.noexternal = False
if jinja2Disabled:
importErrorList.append(
f"{Fore.LIGHTYELLOW_EX} [i] Cannot import 'jinja2', templating is disabled{Fore.RESET}"
)
config.template = None
if xmlImportDisabled:
importErrorList.append(
f"{Fore.LIGHTYELLOW_EX} [i] Cannot import 'lxml', cannot use XML logs as input{Fore.RESET}"
)
if config.xml:
return (
f"{Fore.RED} [-] Cannot import 'lxml', but according to command line provided it is needed{Fore.RESET}",
config,
True,
)
if config.debug or config.imports:
return "\n".join(importErrorList), config, False
if importErrorList == []:
return "", config, False
return (
f"{Fore.LIGHTYELLOW_EX} [i] Import errors, functions can be disabled ('--imports' for details){Fore.RESET}",
config,
)
################################################################
# MAIN()
################################################################
def main():
version = "2.10.0"
version = "2.20.0"
# Init Args handling
parser = argparse.ArgumentParser()
# Input logs and input files filters
parser.add_argument(
# Input files and filtering/selection options
logsInputArgs = parser.add_argument_group(
"Input files and filtering/selection options"
)
logsInputArgs.add_argument(
"-e",
"--evtx",
"--events",
help="Log file or directory where log files are stored in JSON, Auditd, Sysmon for Linux, or EVTX format",
type=str,
)
parser.add_argument(
logsInputArgs.add_argument(
"-s",
"--select",
help="Only files containing the provided string will be used. If there is/are exclusion(s) (--avoid) they will be handled after selection",
action="append",
nargs="+",
)
parser.add_argument(
logsInputArgs.add_argument(
"-a",
"--avoid",
help="EVTX files containing the provided string will NOT be used",
action="append",
nargs="+",
)
parser.add_argument("-f", "--fileext", help="Extension of the log files", type=str)
parser.add_argument(
logsInputArgs.add_argument(
"-f", "--fileext", help="Extension of the log files", type=str
)
logsInputArgs.add_argument(
"-fp",
"--file-pattern",
help="Use a Python Glob pattern to select files. This option only works with directories",
type=str,
)
# Event filtering related options
parser.add_argument(
logsInputArgs.add_argument(
"--no-recursion",
help="By default Zircolite search log/event files recursively, by using this option only the provided directory will be used",
action="store_true",
)
# Events filtering options
eventArgs = parser.add_argument_group("Events filtering options")
eventArgs.add_argument(
"-A",
"--after",
help="Limit to events that happened after the provided timestamp (UTC). Format : 1970-01-01T00:00:00",
type=str,
default="1970-01-01T00:00:00",
)
parser.add_argument(
eventArgs.add_argument(
"-B",
"--before",
help="Limit to events that happened before the provided timestamp (UTC). Format : 1970-01-01T00:00:00",
type=str,
default="9999-12-12T23:59:59",
)
# Input logs format related options
parser.add_argument(
# Event and log formats options
eventFormatsArgs = parser.add_argument_group("Event and log formats options")
eventFormatsArgs.add_argument(
"-j",
"--jsononly",
"--jsonline",
@@ -1605,21 +1897,21 @@ def main():
help="If logs files are already in JSON lines format ('jsonl' in evtx_dump) ",
action="store_true",
)
parser.add_argument(
eventFormatsArgs.add_argument(
"--jsonarray",
"--json-array",
"--json-array-input",
help="Source logs are in JSON but as an array",
action="store_true",
)
parser.add_argument(
eventFormatsArgs.add_argument(
"-D",
"--dbonly",
"--db-input",
help="Directly use a previously saved database file, timerange filters will not work",
action="store_true",
)
parser.add_argument(
eventFormatsArgs.add_argument(
"-S",
"--sysmon4linux",
"--sysmon-linux",
@@ -1627,214 +1919,263 @@ def main():
help="Use this option if your log file is a Sysmon for linux log file, default file extension is '.log'",
action="store_true",
)
parser.add_argument(
eventFormatsArgs.add_argument(
"-AU",
"--auditd",
"--auditd-input",
help="Use this option if your log file is a Auditd log file, default file extension is '.log'",
action="store_true",
)
parser.add_argument(
eventFormatsArgs.add_argument(
"-x",
"--xml",
"--xml-input",
help="Use this option if your log file is a EVTX converted to XML log file, default file extension is '.xml'",
action="store_true",
)
parser.add_argument(
eventFormatsArgs.add_argument(
"--evtxtract",
help="Use this option if your log file was extracted with EVTXtract, default file extension is '.log'",
action="store_true",
)
parser.add_argument(
eventFormatsArgs.add_argument(
"--csvonly",
"--csv-input",
help="Use this option if your log file was extracted with EVTXtract, default file extension is '.log'",
action="store_true",
)
parser.add_argument(
eventFormatsArgs.add_argument(
"-LE",
"--logs-encoding",
help="Specify log encoding when dealing with Sysmon for Linux or Auditd files",
type=str,
)
# Ruleset related options
parser.add_argument(
# Ruleset options
rulesetsFormatsArgs = parser.add_argument_group("Event and log formats options")
rulesetsFormatsArgs.add_argument(
"-r",
"--ruleset",
help="JSON File containing SIGMA rules",
help="JSON File containing SIGMA rules (Zircolite ruleset)",
action="append",
nargs="+",
)
parser.add_argument(
rulesetsFormatsArgs.add_argument(
"-nsc", "--no-sigma-conversion", help=argparse.SUPPRESS, action="store_true"
)
rulesetsFormatsArgs.add_argument(
"-cr",
"--combine-ruleset",
help="JSON File containing SIGMA rules (Zircolite ruleset)",
action="store_true",
)
rulesetsFormatsArgs.add_argument(
"-sr",
"--save-ruleset",
help="Save converted ruleset (SIGMA to Zircolite format) to disk",
action="store_true",
)
rulesetsFormatsArgs.add_argument(
"-ps",
"--pipeline-sysmon",
help="For all the raw SIGMA rulesets (YAML) provided use the sysmon pipeline",
action="store_true",
)
rulesetsFormatsArgs.add_argument(
"-pw",
"--pipeline-windows",
help="For all the raw SIGMA rulesets (YAML) provided use the generic windows pipeline",
action="store_true",
)
rulesetsFormatsArgs.add_argument(
"-pn",
"--pipeline-null",
help="For all the raw SIGMA rulesets (YAML) don't use pipeline (Default)",
action="store_true",
)
rulesetsFormatsArgs.add_argument(
"-R",
"--rulefilter",
help="Remove rule from ruleset, comparison is done on rule title (case sensitive)",
action="append",
nargs="*",
)
parser.add_argument(
rulesetsFormatsArgs.add_argument(
"-L",
"--limit",
"--limit-results",
help="Discard results (in output file or forwarded events) that are above the provided limit",
type=int,
default=-1,
)
# Zircolite execution related options
parser.add_argument(
"--fieldlist", help="Get all events fields", action="store_true"
# Ouput formats and output files options
outputFormatsArgs = parser.add_argument_group(
"Ouput formats and output files options"
)
parser.add_argument(
"--evtx_dump",
help="Tell Zircolite to use this binary for EVTX conversion, on Linux and MacOS the path must be valid to launch the binary (eg. './evtx_dump' and not 'evtx_dump')",
type=str,
default=None,
)
parser.add_argument(
"--no-recursion",
help="By default Zircolite search recursively, by using this option only the provided directory will be used",
action="store_true",
)
# Output files related options
parser.add_argument(
outputFormatsArgs.add_argument(
"-o",
"--outfile",
help="File that will contains all detected events",
type=str,
default="detected_events.json",
)
parser.add_argument(
outputFormatsArgs.add_argument(
"--csv",
"--csv-output",
help="The output will be in CSV. You should note that in this mode empty fields will not be discarded from results",
action="store_true",
)
parser.add_argument(
outputFormatsArgs.add_argument(
"--csv-delimiter",
help="Choose the delimiter for CSV ouput",
type=str,
default=";",
)
parser.add_argument(
outputFormatsArgs.add_argument(
"-t",
"--tmpdir",
help="Temp directory that will contains events converted as JSON (parent directories must exist)",
type=str,
)
parser.add_argument(
outputFormatsArgs.add_argument(
"-k",
"--keeptmp",
help="Do not remove the temp directory containing events converted in JSON format",
action="store_true",
)
parser.add_argument(
outputFormatsArgs.add_argument(
"--keepflat", help="Save flattened events as JSON", action="store_true"
)
parser.add_argument(
outputFormatsArgs.add_argument(
"-d",
"--dbfile",
help="Save all logs in a SQLite Db to the specified file",
type=str,
)
parser.add_argument(
outputFormatsArgs.add_argument(
"-l", "--logfile", help="Log file name", default="zircolite.log", type=str
)
parser.add_argument(
"-n",
"--nolog",
help="Don't create a log file or a result file (useful when forwarding)",
outputFormatsArgs.add_argument(
"--hashes",
help="Add an xxhash64 of the original log event to each event",
action="store_true",
)
# Forwarding
parser.add_argument(
"--remote",
help="Forward results to a HTTP/Splunk/Elasticsearch, please provide the full address e.g http[s]://address:port[/uri]",
type=str,
)
parser.add_argument(
"--token", help="Use this to provide Splunk HEC Token", type=str
)
parser.add_argument("--index", help="Use this to provide ES index", type=str)
parser.add_argument("--eslogin", help="ES login", type=str, default="")
parser.add_argument("--espass", help="ES password", type=str, default="")
parser.add_argument(
"--stream",
help="By default event forwarding is done at the end, this option activate forwarding events when detected",
action="store_true",
)
parser.add_argument("--forwardall", help="Forward all events", action="store_true")
# Configurations related options
parser.add_argument(
# Advanced configuration options
configFormatsArgs = parser.add_argument_group("Advanced configuration options")
configFormatsArgs.add_argument(
"-c",
"--config",
help="JSON File containing field mappings and exclusions",
type=str,
default="config/fieldMappings.json",
)
parser.add_argument(
"--hashes",
help="Add an xxhash64 of the original log event to each event",
action="store_true",
configFormatsArgs.add_argument(
"--fieldlist", help="Get all events fields", action="store_true"
)
parser.add_argument(
"--timefield",
help="Provide time field name for event forwarding, default is 'SystemTime'",
default="SystemTime",
action="store_true",
)
parser.add_argument(
"--cores",
help="Specify how many cores you want to use, default is all cores, works only for EVTX extraction",
configFormatsArgs.add_argument(
"--evtx_dump",
help="Tell Zircolite to use this binary for EVTX conversion, on Linux and MacOS the path must be valid to launch the binary (eg. './evtx_dump' and not 'evtx_dump')",
type=str,
default=None,
)
parser.add_argument("--debug", help="Activate debug logging", action="store_true")
parser.add_argument(
"--showall",
help="Show all events, useful to check what rule takes takes time to execute",
action="store_true",
)
parser.add_argument(
configFormatsArgs.add_argument(
"--noexternal",
help="Don't use evtx_dump external binaries (slower)",
action="store_true",
)
parser.add_argument(
configFormatsArgs.add_argument(
"--cores",
help="Specify how many cores you want to use, default is all cores, works only for EVTX extraction",
type=str,
)
configFormatsArgs.add_argument(
"--debug", help="Activate debug logging", action="store_true"
)
configFormatsArgs.add_argument(
"--imports", help="Show detailed module import errors", action="store_true"
)
configFormatsArgs.add_argument(
"--showall",
help="Show all events, useful to check what rule takes takes time to execute",
action="store_true",
)
configFormatsArgs.add_argument(
"-n",
"--nolog",
help="Don't create a log file or a result file (useful when forwarding)",
action="store_true",
)
configFormatsArgs.add_argument(
"--ondiskdb",
help="Use an on-disk database instead of the in-memory one (much slower !). Use if your system has limited RAM or if your dataset is very large and you cannot split it.",
type=str,
default=":memory:",
)
parser.add_argument(
configFormatsArgs.add_argument(
"-RE",
"--remove-events",
help="Zircolite will try to remove events/logs submitted if analysis is successful (use at your own risk)",
action="store_true",
)
parser.add_argument(
configFormatsArgs.add_argument(
"-U",
"--update-rules",
help="Update rulesets located in the 'rules' directory",
action="store_true",
)
parser.add_argument(
configFormatsArgs.add_argument(
"-v", "--version", help="Show Zircolite version", action="store_true"
)
# Templating and Mini GUI
parser.add_argument(
# Forwarding options
forwardingFormatsArgs = parser.add_argument_group("Forwarding options")
forwardingFormatsArgs.add_argument(
"--remote",
help="Forward results to a HTTP/Splunk/Elasticsearch, please provide the full address e.g http[s]://address:port[/uri]",
type=str,
)
forwardingFormatsArgs.add_argument(
"--token", help="Use this to provide Splunk HEC Token", type=str
)
forwardingFormatsArgs.add_argument(
"--index", help="Use this to provide ES index", type=str
)
forwardingFormatsArgs.add_argument(
"--eslogin", help="ES login", type=str, default=""
)
forwardingFormatsArgs.add_argument(
"--espass", help="ES password", type=str, default=""
)
forwardingFormatsArgs.add_argument(
"--stream",
help="By default event forwarding is done at the end, this option activate forwarding events when detected",
action="store_true",
)
forwardingFormatsArgs.add_argument(
"--forwardall", help="Forward all events", action="store_true"
)
forwardingFormatsArgs.add_argument(
"--timefield",
help="Provide time field name for event forwarding, default is 'SystemTime'",
default="SystemTime",
action="store_true",
)
# Templating and Mini GUI options
templatingFormatsArgs = parser.add_argument_group("Templating and Mini GUI options")
templatingFormatsArgs.add_argument(
"--template",
help="If a Jinja2 template is specified it will be used to generated output",
type=str,
action="append",
nargs="+",
)
parser.add_argument(
templatingFormatsArgs.add_argument(
"--templateOutput",
help="If a Jinja2 template is specified it will be used to generate a crafted output",
type=str,
action="append",
nargs="+",
)
parser.add_argument(
"--package",
help="Create a ZircoGui package (not available in embedded mode)",
action="store_true",
templatingFormatsArgs.add_argument(
"--package", help="Create a ZircoGui package", action="store_true"
)
args = parser.parse_args()
@@ -1857,6 +2198,15 @@ def main():
"""
)
# Show imports status
importsMessage, args, mustQuit = ImportErrorHandler(args)
if importsMessage != "":
consoleLogger.info(f"[+] Modules imports status: \n{importsMessage}")
else:
consoleLogger.info("[+] Modules imports status: OK")
if mustQuit:
sys.exit(1)
# Print version an quit
if args.version:
consoleLogger.info(f"Zircolite - v{version}"), sys.exit(0)
@@ -1871,7 +2221,7 @@ def main():
if args.ruleset:
args.ruleset = [item for args in args.ruleset for item in args]
else:
args.ruleset = ["rules/rules_windows_generic.json"]
args.ruleset = ["rules/rules_windows_generic_pysigma.json"]
# Check mandatory CLI options
if not args.evtx:
@@ -1894,16 +2244,17 @@ def main():
consoleLogger.info("[+] Checking prerequisites")
# Init Forwarding
forwarder = eventForwarder(
remote=args.remote,
timeField=args.timefield,
token=args.token,
logger=consoleLogger,
index=args.index,
login=args.eslogin,
password=args.espass,
)
forwarder = None
if args.remote is not None:
forwarder = eventForwarder(
remote=args.remote,
timeField=args.timefield,
token=args.token,
logger=consoleLogger,
index=args.index,
login=args.eslogin,
password=args.espass,
)
if not forwarder.networkCheck():
quitOnError(
f"{Fore.RED} [-] Remote host cannot be reached : {args.remote}{Fore.RESET}",
@@ -1920,15 +2271,6 @@ def main():
consoleLogger,
)
binPath = args.evtx_dump
# Check ruleset arg
for ruleset in args.ruleset:
checkIfExists(
ruleset,
f"{Fore.RED} [-] Cannot find ruleset : {ruleset}. Default rulesets are available here : https://github.com/wagga40/Zircolite-Rules{Fore.RESET}",
consoleLogger,
)
# Check templates args
readyForTemplating = False
if args.template is not None:
@@ -2033,7 +2375,7 @@ def main():
providedTmpDir=args.tmpdir,
coreCount=args.cores,
useExternalBinaries=(not args.noexternal),
binPath=binPath,
binPath=args.evtx_dump,
xmlLogs=args.xml,
sysmon4linux=args.sysmon4linux,
auditdLogs=args.auditd,
@@ -2058,7 +2400,8 @@ def main():
)
if LogJSONList == []:
quitOnError(
f"{Fore.RED} [-] No JSON files found.{Fore.RESET}", consoleLogger
f"{Fore.RED} [-] No files containing logs found.{Fore.RESET}",
consoleLogger,
)
# Print field list and exit
@@ -2099,9 +2442,11 @@ def main():
args.rulefilter = [item for sublist in args.rulefilter for item in sublist]
writeMode = "w"
for ruleset in args.ruleset:
consoleLogger.info(f"[+] Loading ruleset from : {ruleset}")
zircoliteCore.loadRulesetFromFile(filename=ruleset, ruleFilters=args.rulefilter)
consoleLogger.info("[+] Loading ruleset(s)")
rulesetsManager = rulesetHandler(consoleLogger, args)
for ruleset in rulesetsManager.Rulesets:
zircoliteCore.loadRulesetFromVar(ruleset=ruleset, ruleFilters=args.rulefilter)
if args.limit > 0:
consoleLogger.info(
f"[+] Limited mode : detections with more than {args.limit} events will be discarded"

View File

@@ -4,6 +4,7 @@
import argparse
import asyncio
import csv
import functools
import hashlib
import logging
import multiprocessing as mp
@@ -22,21 +23,63 @@ from pathlib import Path
from sqlite3 import Error
from sys import platform as _platform
# External libs
import aiohttp
# External libs (Mandatory)
import orjson as json
import requests
import urllib3
import xxhash
from colorama import Fore
from elasticsearch import AsyncElasticsearch
from evtx import PyEvtxParser
from jinja2 import Template
from lxml import etree
from tqdm import tqdm
from tqdm.asyncio import tqdm as tqdmAsync
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# External libs (Optional)
forwardingDisabled = False
try:
import aiohttp
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
except ImportError:
forwardingDisabled = True
elasticForwardingDisabled = False
try:
from elasticsearch import AsyncElasticsearch
except ImportError:
elasticForwardingDisabled = True
updateDisabled = False
try:
import requests
except ImportError:
forwardingDisabled = True
updateDisabled = True
sigmaConversionDisabled = False
try:
from sigma.collection import SigmaCollection
from sigma.backends.sqlite import sqlite
from sigma.pipelines.sysmon import sysmon_pipeline
from sigma.pipelines.windows import windows_logsource_pipeline, windows_audit_pipeline
from sigma.processing.resolver import ProcessingPipelineResolver
import yaml
except ImportError:
sigmaConversionDisabled = True
pyevtxDisabled = False
try:
from evtx import PyEvtxParser
except ImportError:
pyevtxDisabled = True
jinja2Disabled = False
try:
from jinja2 import Template
except ImportError:
jinja2Disabled = True
xmlImportDisabled = False
try:
from lxml import etree
except ImportError:
xmlImportDisabled = True
def signal_handler(sig, frame):
print("[-] Execution interrupted !")
@@ -653,7 +696,7 @@ class zirCore:
self.ruleset = json.loads(f.read())
self.applyRulesetFilters(ruleFilters)
except Exception as e:
self.logger.error(f"{Fore.RED} [-] Load JSON ruleset failed, are you sure it is a valid JSON file ? : {e}{Fore.RESET}")
self.logger.error(f"{Fore.RED} [-] Loading JSON ruleset failed, are you sure it is a valid JSON file ? : {e}{Fore.RESET}")
def loadRulesetFromVar(self, ruleset, ruleFilters):
self.ruleset = ruleset
@@ -767,11 +810,6 @@ class evtxExtractor:
def randString(self):
return ''.join(random.SystemRandom().choice(string.ascii_uppercase + string.digits) for _ in range(8))
def makeExecutable(self, path):
mode = os.stat(path).st_mode
mode |= (mode & 0o444) >> 2
os.chmod(path, mode)
def getOSExternalTools(self, binPath):
""" Determine which binaries to run depending on host OS : 32Bits is NOT supported for now since evtx_dump is 64bits only"""
if binPath is None:
@@ -789,15 +827,18 @@ class evtxExtractor:
Convert EVTX to JSON using evtx_dump bindings (slower)
Drop resulting JSON files in a tmp folder.
"""
try:
filepath = Path(file)
filename = filepath.name
parser = PyEvtxParser(str(filepath))
with open(f"{self.tmpDir}/{str(filename)}-{self.randString()}.json", "w", encoding="utf-8") as f:
for record in parser.records_json():
f.write(f'{json.dumps(json.loads(record["data"])).decode("utf-8")}\n')
except Exception as e:
self.logger.error(f"{Fore.RED} [-] {e}{Fore.RESET}")
if not self.useExternalBinaries:
try:
filepath = Path(file)
filename = filepath.name
parser = PyEvtxParser(str(filepath))
with open(f"{self.tmpDir}/{str(filename)}-{self.randString()}.json", "w", encoding="utf-8") as f:
for record in parser.records_json():
f.write(f'{json.dumps(json.loads(record["data"])).decode("utf-8")}\n')
except Exception as e:
self.logger.error(f"{Fore.RED} [-] Cannot use PyEvtxParser{Fore.RESET}")
else:
self.logger.error(f"{Fore.RED} [-] Cannot use PyEvtxParser and evtx_dump is disabled or missing{Fore.RESET}")
def getTime(self, line):
timestamp = line.replace('msg=audit(','').replace('):','').split(':')
@@ -1092,6 +1133,135 @@ class rulesUpdater:
except Exception as e:
self.logger.error(f" [-] {e}")
class rulesetHandler:
def __init__(self, logger=None, config=None):
self.logger = logger or logging.getLogger(__name__)
self.saveRuleset = config.save_ruleset
self.rulesetPathList = config.ruleset
self.cores = config.cores or os.cpu_count()
self.sigmaConversionDisabled = config.no_sigma_conversion
if self.sigmaConversionDisabled:
self.logger.info(f"{Fore.LIGHTYELLOW_EX} [i] SIGMA conversion is disabled ! {Fore.RESET}")
# Rsolving pipelines
if config.pipeline_sysmon:
self.pipelines = [sysmon_pipeline(), windows_logsource_pipeline()]
elif config.pipeline_windows:
self.pipelines = [windows_audit_pipeline(), windows_logsource_pipeline()]
else:
self.pipelines = []
# Parse & (if necessary) convert ruleset, final list is stored in self.Rulesets
self.Rulesets = self.rulesetParsing()
# Combining Rulesets
if config.combine_ruleset:
self.Rulesets = [item for subRuleset in self.Rulesets if subRuleset for item in subRuleset]
self.Rulesets = [sorted(self.Rulesets, key=lambda d: d['level'])] # Sorting by level
if all(not subRuleset for subRuleset in self.Rulesets):
self.logger.error(f"{Fore.RED} [-] No rules to execute !{Fore.RESET}")
def isYAML(self, filepath):
""" Test if the file is a YAML file """
if (filepath.suffix == ".yml" or filepath.suffix == ".yaml"):
with open(filepath, 'r') as file:
content = file.read()
try:
yaml.safe_load(content)
return True
except yaml.YAMLError:
return False
def isJSON(self, filepath):
""" Test if the file is a JSON file """
if (filepath.suffix == ".json"):
with open(filepath, 'r') as file:
content = file.read()
try:
json.loads(content)
return True
except json.JSONDecodeError:
return False
def randRulesetName(self, sigmaRules):
# Clean the ruleset name
cleanedName = ''.join(char if char.isalnum() else '-' for char in sigmaRules).strip('-')
cleanedName = re.sub(r'-+', '-', cleanedName)
# Generate a random string
randomString = ''.join(random.SystemRandom().choice(string.ascii_uppercase + string.digits) for _ in range(8))
return f"ruleset-{cleanedName}-{randomString}.json"
def convertSigmaRules(self, backend, rule):
try:
return backend.convert_rule(rule, "zircolite")[0]
except Exception as e:
self.logger.debug(f"{Fore.RED} [-] Cannot convert rule '{str(rule)}' : {e}{Fore.RESET}")
def sigmaRulesToRuleset(self, SigmaRulesList, pipelines):
for sigmaRules in SigmaRulesList:
# Create the pipeline resolver
piperesolver = ProcessingPipelineResolver()
# Add pipelines
for pipeline in pipelines:
piperesolver.add_pipeline_class(pipeline) # Sysmon handling
# Create a single sorted and prioritzed pipeline
combined_pipeline = piperesolver.resolve(piperesolver.pipelines)
# Instantiate backend, using our resolved pipeline
sqlite_backend = sqlite.sqliteBackend(combined_pipeline)
rules = Path(sigmaRules)
if rules.is_dir():
rule_list = list(rules.rglob("*.yml")) + list(rules.rglob("*.yaml"))
else:
rule_list = [rules]
rule_collection = SigmaCollection.load_ruleset(rule_list)
ruleset = []
pool = mp.Pool(self.cores)
ruleset = pool.map(functools.partial(self.convertSigmaRules, sqlite_backend), tqdm(rule_collection, colour="yellow"))
pool.close()
pool.join()
ruleset = [rule for rule in ruleset if rule is not None] # Removing empty results
ruleset = sorted(ruleset, key=lambda d: d['level']) # Sorting by level
if self.saveRuleset:
with open(self.randRulesetName(str(sigmaRules)), 'w') as outfile:
outfile.write(json.dumps(ruleset, option=json.OPT_INDENT_2).decode('utf-8'))
return ruleset
def rulesetParsing(self):
rulesetList = []
for ruleset in self.rulesetPathList:
rulesetPath = Path(ruleset)
if rulesetPath.exists():
if rulesetPath.is_file():
if self.isJSON(rulesetPath): # JSON Ruleset
try:
with open(rulesetPath, encoding='utf-8') as f:
rulesetList.append(json.loads(f.read()))
self.logger.info(f"{Fore.CYAN} [+] Loaded JSON/Zircolite ruleset : {str(rulesetPath)}{Fore.RESET}")
except Exception as e:
self.logger.error(f"{Fore.RED} [-] Cannot load {str(rulesetPath)} {e}{Fore.RESET}")
else: # YAML Ruleset
if self.isYAML(rulesetPath) and not self.sigmaConversionDisabled:
try:
self.logger.info(f"{Fore.CYAN} [+] Converting Native SIGMA to Zircolite ruleset : {str(rulesetPath)}{Fore.RESET}")
rulesetList.append(self.sigmaRulesToRuleset([rulesetPath], self.pipelines))
except Exception as e:
self.logger.error(f"{Fore.RED} [-] Cannot convert {str(rulesetPath)} {e}{Fore.RESET}")
elif rulesetPath.is_dir() and not self.sigmaConversionDisabled: # Directory
try:
self.logger.info(f"{Fore.CYAN} [+] Converting Native SIGMA to Zircolite ruleset : {str(rulesetPath)}{Fore.RESET}")
rulesetList.append(self.sigmaRulesToRuleset([rulesetPath], self.pipelines))
except Exception as e:
self.logger.error(f"{Fore.RED} [-] Cannot convert {str(rulesetPath)} {e}{Fore.RESET}")
return rulesetList
def selectFiles(pathList, selectFilesList):
if selectFilesList is not None:
return [evtx for evtx in [str(element) for element in list(pathList)] if any(fileFilters[0].lower() in evtx.lower() for fileFilters in selectFilesList)]
@@ -1102,75 +1272,123 @@ def avoidFiles(pathList, avoidFilesList):
return [evtx for evtx in [str(element) for element in list(pathList)] if all(fileFilters[0].lower() not in evtx.lower() for fileFilters in avoidFilesList)]
return pathList
def ImportErrorHandler(config):
importErrorList = []
if forwardingDisabled:
importErrorList.append(f"{Fore.LIGHTYELLOW_EX} [i] Cannot import 'aiohttp' or 'urllib3' or 'requests', events forwarding is disabled{Fore.RESET}")
config.remote = None
if elasticForwardingDisabled:
importErrorList.append(f"{Fore.LIGHTYELLOW_EX} [i] Cannot import 'elasticsearch[async]', events forwarding to Elastic is disabled{Fore.RESET}")
config.index = None
if updateDisabled:
importErrorList.append(f"{Fore.LIGHTYELLOW_EX} [i] Cannot import 'requests', events update is disabled{Fore.RESET}")
config.update_rules = False
if sigmaConversionDisabled:
importErrorList.append(f"{Fore.LIGHTYELLOW_EX} [i] Cannot import 'sigma' from pySigma, ruleset conversion YAML -> JSON is disabled{Fore.RESET}")
config.no_sigma_conversion = True
if pyevtxDisabled:
importErrorList.append(f"{Fore.LIGHTYELLOW_EX} [i] Cannot import 'evtx' from pyevtx-rs, use of external binaries is mandatory{Fore.RESET}")
config.noexternal = False
if jinja2Disabled:
importErrorList.append(f"{Fore.LIGHTYELLOW_EX} [i] Cannot import 'jinja2', templating is disabled{Fore.RESET}")
config.template = None
if xmlImportDisabled:
importErrorList.append(f"{Fore.LIGHTYELLOW_EX} [i] Cannot import 'lxml', cannot use XML logs as input{Fore.RESET}")
if config.xml:
return f"{Fore.RED} [-] Cannot import 'lxml', but according to command line provided it is needed{Fore.RESET}", config, True
if config.debug or config.imports:
return "\n".join(importErrorList), config, False
if importErrorList == []:
return "", config, False
return f"{Fore.LIGHTYELLOW_EX} [i] Import errors, functions can be disabled ('--imports' for details){Fore.RESET}", config
################################################################
# MAIN()
################################################################
def main():
version = "2.10.0"
version = "2.20.0"
# Init Args handling
parser = argparse.ArgumentParser()
# Input logs and input files filters
parser.add_argument("-e", "--evtx", "--events", help="Log file or directory where log files are stored in JSON, Auditd, Sysmon for Linux, or EVTX format", type=str)
parser.add_argument("-s", "--select", help="Only files containing the provided string will be used. If there is/are exclusion(s) (--avoid) they will be handled after selection", action='append', nargs='+')
parser.add_argument("-a", "--avoid", help="EVTX files containing the provided string will NOT be used", action='append', nargs='+')
parser.add_argument("-f", "--fileext", help="Extension of the log files", type=str)
parser.add_argument("-fp", "--file-pattern", help="Use a Python Glob pattern to select files. This option only works with directories", type=str)
# Event filtering related options
parser.add_argument("-A", "--after", help="Limit to events that happened after the provided timestamp (UTC). Format : 1970-01-01T00:00:00", type=str, default="1970-01-01T00:00:00")
parser.add_argument("-B", "--before", help="Limit to events that happened before the provided timestamp (UTC). Format : 1970-01-01T00:00:00", type=str, default="9999-12-12T23:59:59")
# Input logs format related options
parser.add_argument("-j", "--jsononly", "--jsonline", "--jsonl", "--json-input", help="If logs files are already in JSON lines format ('jsonl' in evtx_dump) ", action='store_true')
parser.add_argument("--jsonarray", "--json-array", "--json-array-input", help="Source logs are in JSON but as an array", action='store_true')
parser.add_argument("-D", "--dbonly", "--db-input", help="Directly use a previously saved database file, timerange filters will not work", action='store_true')
parser.add_argument("-S", "--sysmon4linux", "--sysmon-linux", "--sysmon-linux-input", help="Use this option if your log file is a Sysmon for linux log file, default file extension is '.log'", action='store_true')
parser.add_argument("-AU", "--auditd", help="Use this option if your log file is a Auditd log file, default file extension is '.log'", action='store_true')
parser.add_argument("-x", "--xml", help="Use this option if your log file is a EVTX converted to XML log file, default file extension is '.xml'", action='store_true')
parser.add_argument("--evtxtract", help="Use this option if your log file was extracted with EVTXtract, default file extension is '.log'", action='store_true')
parser.add_argument("--csvonly" ,"--csv-input", help="Use this option if your log file was extracted with EVTXtract, default file extension is '.log'", action='store_true')
parser.add_argument("-LE", "--logs-encoding", help="Specify log encoding when dealing with Sysmon for Linux or Auditd files", type=str)
# Ruleset related options
parser.add_argument("-r", "--ruleset", help="JSON File containing SIGMA rules", action='append', nargs='+')
parser.add_argument("-R", "--rulefilter", help="Remove rule from ruleset, comparison is done on rule title (case sensitive)", action='append', nargs='*')
parser.add_argument("-L", "--limit", help="Discard results (in output file or forwarded events) that are above the provided limit", type=int, default=-1)
# Zircolite execution related options
parser.add_argument("--fieldlist", help="Get all events fields", action='store_true')
parser.add_argument("--evtx_dump", help="Tell Zircolite to use this binary for EVTX conversion, on Linux and MacOS the path must be valid to launch the binary (eg. './evtx_dump' and not 'evtx_dump')", type=str, default=None)
parser.add_argument("--no-recursion", help="By default Zircolite search recursively, by using this option only the provided directory will be used", action="store_true")
# Output files related options
parser.add_argument("-o", "--outfile", help="File that will contains all detected events", type=str, default="detected_events.json")
parser.add_argument("--csv", help="The output will be in CSV. You should note that in this mode empty fields will not be discarded from results", action='store_true')
parser.add_argument("--csv-delimiter", help="Choose the delimiter for CSV ouput", type=str, default=";")
parser.add_argument("-t", "--tmpdir", help="Temp directory that will contains events converted as JSON (parent directories must exist)", type=str)
parser.add_argument("-k", "--keeptmp", help="Do not remove the temp directory containing events converted in JSON format", action='store_true')
parser.add_argument("--keepflat", help="Save flattened events as JSON", action='store_true')
parser.add_argument("-d", "--dbfile", help="Save all logs in a SQLite Db to the specified file", type=str)
parser.add_argument("-l", "--logfile", help="Log file name", default="zircolite.log", type=str)
parser.add_argument("-n", "--nolog", help="Don't create a log file or a result file (useful when forwarding)", action='store_true')
# Forwarding
parser.add_argument("--remote", help="Forward results to a HTTP/Splunk/Elasticsearch, please provide the full address e.g http[s]://address:port[/uri]", type=str)
parser.add_argument("--token", help="Use this to provide Splunk HEC Token", type=str)
parser.add_argument("--index", help="Use this to provide ES index", type=str)
parser.add_argument("--eslogin", help="ES login", type=str, default="")
parser.add_argument("--espass", help="ES password", type=str, default="")
parser.add_argument("--stream", help="By default event forwarding is done at the end, this option activate forwarding events when detected", action="store_true")
parser.add_argument("--forwardall", help="Forward all events", action="store_true")
# Configurations related options
parser.add_argument("-c", "--config", help="JSON File containing field mappings and exclusions", type=str, default="config/fieldMappings.json")
parser.add_argument("--hashes", help="Add an xxhash64 of the original log event to each event", action='store_true')
parser.add_argument("--timefield", help="Provide time field name for event forwarding, default is 'SystemTime'", default="SystemTime", action="store_true")
parser.add_argument("--cores", help="Specify how many cores you want to use, default is all cores, works only for EVTX extraction", type=str)
parser.add_argument("--debug", help="Activate debug logging", action='store_true')
parser.add_argument("--showall", help="Show all events, useful to check what rule takes takes time to execute", action='store_true')
parser.add_argument("--noexternal", help="Don't use evtx_dump external binaries (slower)", action='store_true')
parser.add_argument("--ondiskdb", help="Use an on-disk database instead of the in-memory one (much slower !). Use if your system has limited RAM or if your dataset is very large and you cannot split it.", type=str, default=":memory:")
parser.add_argument("-RE", "--remove-events", help="Zircolite will try to remove events/logs submitted if analysis is successful (use at your own risk)", action='store_true')
parser.add_argument("-U", "--update-rules", help="Update rulesets located in the 'rules' directory", action='store_true')
parser.add_argument("-v", "--version", help="Show Zircolite version", action='store_true')
# Templating and Mini GUI
parser.add_argument("--template", help="If a Jinja2 template is specified it will be used to generated output", type=str, action='append', nargs='+')
parser.add_argument("--templateOutput", help="If a Jinja2 template is specified it will be used to generate a crafted output", type=str, action='append', nargs='+')
parser.add_argument("--package", help="Create a ZircoGui package (not available in embedded mode)", action='store_true')
# Input files and filtering/selection options
logsInputArgs = parser.add_argument_group('Input files and filtering/selection options')
logsInputArgs.add_argument("-e", "--evtx", "--events", help="Log file or directory where log files are stored in JSON, Auditd, Sysmon for Linux, or EVTX format", type=str)
logsInputArgs.add_argument("-s", "--select", help="Only files containing the provided string will be used. If there is/are exclusion(s) (--avoid) they will be handled after selection", action='append', nargs='+')
logsInputArgs.add_argument("-a", "--avoid", help="EVTX files containing the provided string will NOT be used", action='append', nargs='+')
logsInputArgs.add_argument("-f", "--fileext", help="Extension of the log files", type=str)
logsInputArgs.add_argument("-fp", "--file-pattern", help="Use a Python Glob pattern to select files. This option only works with directories", type=str)
logsInputArgs.add_argument("--no-recursion", help="By default Zircolite search log/event files recursively, by using this option only the provided directory will be used", action="store_true")
# Events filtering options
eventArgs = parser.add_argument_group('Events filtering options')
eventArgs.add_argument("-A", "--after", help="Limit to events that happened after the provided timestamp (UTC). Format : 1970-01-01T00:00:00", type=str, default="1970-01-01T00:00:00")
eventArgs.add_argument("-B", "--before", help="Limit to events that happened before the provided timestamp (UTC). Format : 1970-01-01T00:00:00", type=str, default="9999-12-12T23:59:59")
# Event and log formats options
eventFormatsArgs = parser.add_argument_group('Event and log formats options')
eventFormatsArgs.add_argument("-j", "--jsononly", "--jsonline", "--jsonl", "--json-input", help="If logs files are already in JSON lines format ('jsonl' in evtx_dump) ", action='store_true')
eventFormatsArgs.add_argument("--jsonarray", "--json-array", "--json-array-input", help="Source logs are in JSON but as an array", action='store_true')
eventFormatsArgs.add_argument("-D", "--dbonly", "--db-input", help="Directly use a previously saved database file, timerange filters will not work", action='store_true')
eventFormatsArgs.add_argument("-S", "--sysmon4linux", "--sysmon-linux", "--sysmon-linux-input", help="Use this option if your log file is a Sysmon for linux log file, default file extension is '.log'", action='store_true')
eventFormatsArgs.add_argument("-AU", "--auditd", "--auditd-input", help="Use this option if your log file is a Auditd log file, default file extension is '.log'", action='store_true')
eventFormatsArgs.add_argument("-x", "--xml", "--xml-input", help="Use this option if your log file is a EVTX converted to XML log file, default file extension is '.xml'", action='store_true')
eventFormatsArgs.add_argument("--evtxtract", help="Use this option if your log file was extracted with EVTXtract, default file extension is '.log'", action='store_true')
eventFormatsArgs.add_argument("--csvonly" ,"--csv-input", help="Use this option if your log file was extracted with EVTXtract, default file extension is '.log'", action='store_true')
eventFormatsArgs.add_argument("-LE", "--logs-encoding", help="Specify log encoding when dealing with Sysmon for Linux or Auditd files", type=str)
# Ruleset options
rulesetsFormatsArgs = parser.add_argument_group('Event and log formats options')
rulesetsFormatsArgs.add_argument("-r", "--ruleset", help="JSON File containing SIGMA rules (Zircolite ruleset)", action='append', nargs='+')
rulesetsFormatsArgs.add_argument("-nsc", "--no-sigma-conversion", help=argparse.SUPPRESS, action='store_true')
rulesetsFormatsArgs.add_argument("-cr", "--combine-ruleset", help="JSON File containing SIGMA rules (Zircolite ruleset)", action='store_true')
rulesetsFormatsArgs.add_argument("-sr", "--save-ruleset", help="Save converted ruleset (SIGMA to Zircolite format) to disk", action='store_true')
rulesetsFormatsArgs.add_argument("-ps", "--pipeline-sysmon", help="For all the raw SIGMA rulesets (YAML) provided use the sysmon pipeline", action='store_true')
rulesetsFormatsArgs.add_argument("-pw", "--pipeline-windows", help="For all the raw SIGMA rulesets (YAML) provided use the generic windows pipeline", action='store_true')
rulesetsFormatsArgs.add_argument("-pn", "--pipeline-null", help="For all the raw SIGMA rulesets (YAML) don't use pipeline (Default)", action='store_true')
rulesetsFormatsArgs.add_argument("-R", "--rulefilter", help="Remove rule from ruleset, comparison is done on rule title (case sensitive)", action='append', nargs='*')
rulesetsFormatsArgs.add_argument("-L", "--limit", "--limit-results", help="Discard results (in output file or forwarded events) that are above the provided limit", type=int, default=-1)
# Ouput formats and output files options
outputFormatsArgs = parser.add_argument_group('Ouput formats and output files options')
outputFormatsArgs.add_argument("-o", "--outfile", help="File that will contains all detected events", type=str, default="detected_events.json")
outputFormatsArgs.add_argument("--csv", "--csv-output", help="The output will be in CSV. You should note that in this mode empty fields will not be discarded from results", action='store_true')
outputFormatsArgs.add_argument("--csv-delimiter", help="Choose the delimiter for CSV ouput", type=str, default=";")
outputFormatsArgs.add_argument("-t", "--tmpdir", help="Temp directory that will contains events converted as JSON (parent directories must exist)", type=str)
outputFormatsArgs.add_argument("-k", "--keeptmp", help="Do not remove the temp directory containing events converted in JSON format", action='store_true')
outputFormatsArgs.add_argument("--keepflat", help="Save flattened events as JSON", action='store_true')
outputFormatsArgs.add_argument("-d", "--dbfile", help="Save all logs in a SQLite Db to the specified file", type=str)
outputFormatsArgs.add_argument("-l", "--logfile", help="Log file name", default="zircolite.log", type=str)
outputFormatsArgs.add_argument("--hashes", help="Add an xxhash64 of the original log event to each event", action='store_true')
# Advanced configuration options
configFormatsArgs = parser.add_argument_group('Advanced configuration options')
configFormatsArgs.add_argument("-c", "--config", help="JSON File containing field mappings and exclusions", type=str, default="config/fieldMappings.json")
configFormatsArgs.add_argument("--fieldlist", help="Get all events fields", action='store_true')
configFormatsArgs.add_argument("--evtx_dump", help="Tell Zircolite to use this binary for EVTX conversion, on Linux and MacOS the path must be valid to launch the binary (eg. './evtx_dump' and not 'evtx_dump')", type=str, default=None)
configFormatsArgs.add_argument("--noexternal", help="Don't use evtx_dump external binaries (slower)", action='store_true')
configFormatsArgs.add_argument("--cores", help="Specify how many cores you want to use, default is all cores, works only for EVTX extraction", type=str)
configFormatsArgs.add_argument("--debug", help="Activate debug logging", action='store_true')
configFormatsArgs.add_argument("--imports", help="Show detailed module import errors", action='store_true')
configFormatsArgs.add_argument("--showall", help="Show all events, useful to check what rule takes takes time to execute", action='store_true')
configFormatsArgs.add_argument("-n", "--nolog", help="Don't create a log file or a result file (useful when forwarding)", action='store_true')
configFormatsArgs.add_argument("--ondiskdb", help="Use an on-disk database instead of the in-memory one (much slower !). Use if your system has limited RAM or if your dataset is very large and you cannot split it.", type=str, default=":memory:")
configFormatsArgs.add_argument("-RE", "--remove-events", help="Zircolite will try to remove events/logs submitted if analysis is successful (use at your own risk)", action='store_true')
configFormatsArgs.add_argument("-U", "--update-rules", help="Update rulesets located in the 'rules' directory", action='store_true')
configFormatsArgs.add_argument("-v", "--version", help="Show Zircolite version", action='store_true')
# Forwarding options
forwardingFormatsArgs = parser.add_argument_group('Forwarding options')
forwardingFormatsArgs.add_argument("--remote", help="Forward results to a HTTP/Splunk/Elasticsearch, please provide the full address e.g http[s]://address:port[/uri]", type=str)
forwardingFormatsArgs.add_argument("--token", help="Use this to provide Splunk HEC Token", type=str)
forwardingFormatsArgs.add_argument("--index", help="Use this to provide ES index", type=str)
forwardingFormatsArgs.add_argument("--eslogin", help="ES login", type=str, default="")
forwardingFormatsArgs.add_argument("--espass", help="ES password", type=str, default="")
forwardingFormatsArgs.add_argument("--stream", help="By default event forwarding is done at the end, this option activate forwarding events when detected", action="store_true")
forwardingFormatsArgs.add_argument("--forwardall", help="Forward all events", action="store_true")
forwardingFormatsArgs.add_argument("--timefield", help="Provide time field name for event forwarding, default is 'SystemTime'", default="SystemTime", action="store_true")
# Templating and Mini GUI options
templatingFormatsArgs = parser.add_argument_group('Templating and Mini GUI options')
templatingFormatsArgs.add_argument("--template", help="If a Jinja2 template is specified it will be used to generated output", type=str, action='append', nargs='+')
templatingFormatsArgs.add_argument("--templateOutput", help="If a Jinja2 template is specified it will be used to generate a crafted output", type=str, action='append', nargs='+')
templatingFormatsArgs.add_argument("--package", help="Create a ZircoGui package", action='store_true')
args = parser.parse_args()
signal.signal(signal.SIGINT, signal_handler)
@@ -1190,6 +1408,15 @@ def main():
-= Standalone SIGMA Detection tool for EVTX/Auditd/Sysmon Linux =-
""")
# Show imports status
importsMessage, args, mustQuit = ImportErrorHandler(args)
if importsMessage != "":
consoleLogger.info(f"[+] Modules imports status: \n{importsMessage}")
else:
consoleLogger.info("[+] Modules imports status: OK")
if mustQuit:
sys.exit(1)
# Print version an quit
if args.version:
consoleLogger.info(f"Zircolite - v{version}"), sys.exit(0)
@@ -1204,7 +1431,7 @@ def main():
if args.ruleset:
args.ruleset = [item for args in args.ruleset for item in args]
else:
args.ruleset = ["rules/rules_windows_generic.json"]
args.ruleset = ["rules/rules_windows_generic_pysigma.json"]
# Check mandatory CLI options
if not args.evtx:
@@ -1219,8 +1446,9 @@ def main():
consoleLogger.info("[+] Checking prerequisites")
# Init Forwarding
forwarder = eventForwarder(remote=args.remote, timeField=args.timefield, token=args.token, logger=consoleLogger, index=args.index, login=args.eslogin, password=args.espass)
forwarder = None
if args.remote is not None:
forwarder = eventForwarder(remote=args.remote, timeField=args.timefield, token=args.token, logger=consoleLogger, index=args.index, login=args.eslogin, password=args.espass)
if not forwarder.networkCheck():
quitOnError(f"{Fore.RED} [-] Remote host cannot be reached : {args.remote}{Fore.RESET}", consoleLogger)
@@ -1231,11 +1459,6 @@ def main():
except Exception:
quitOnError(f"{Fore.RED} [-] Wrong timestamp format. Please use 'AAAA-MM-DDTHH:MM:SS'", consoleLogger)
binPath = args.evtx_dump
# Check ruleset arg
for ruleset in args.ruleset:
checkIfExists(ruleset, f"{Fore.RED} [-] Cannot find ruleset : {ruleset}. Default rulesets are available here : https://github.com/wagga40/Zircolite-Rules{Fore.RESET}", consoleLogger)
# Check templates args
readyForTemplating = False
if (args.template is not None):
@@ -1302,7 +1525,7 @@ def main():
if not args.jsononly and not args.jsonarray:
# Init EVTX extractor object
extractor = evtxExtractor(logger=consoleLogger, providedTmpDir=args.tmpdir, coreCount=args.cores, useExternalBinaries=(not args.noexternal), binPath=binPath, xmlLogs=args.xml, sysmon4linux=args.sysmon4linux, auditdLogs=args.auditd, evtxtract=args.evtxtract, encoding=args.logs_encoding, csvInput=args.csvonly)
extractor = evtxExtractor(logger=consoleLogger, providedTmpDir=args.tmpdir, coreCount=args.cores, useExternalBinaries=(not args.noexternal), binPath=args.evtx_dump, xmlLogs=args.xml, sysmon4linux=args.sysmon4linux, auditdLogs=args.auditd, evtxtract=args.evtxtract, encoding=args.logs_encoding, csvInput=args.csvonly)
consoleLogger.info(f"[+] Extracting events Using '{extractor.tmpDir}' directory ")
for evtx in tqdm(FileList, colour="yellow"):
extractor.run(evtx)
@@ -1313,7 +1536,7 @@ def main():
checkIfExists(args.config, f"{Fore.RED} [-] Cannot find mapping file, you can get the default one here : https://github.com/wagga40/Zircolite/blob/master/config/fieldMappings.json {Fore.RESET}", consoleLogger)
if LogJSONList == []:
quitOnError(f"{Fore.RED} [-] No JSON files found.{Fore.RESET}", consoleLogger)
quitOnError(f"{Fore.RED} [-] No files containing logs found.{Fore.RESET}", consoleLogger)
# Print field list and exit
if args.fieldlist:
@@ -1341,9 +1564,11 @@ def main():
args.rulefilter = [item for sublist in args.rulefilter for item in sublist]
writeMode = "w"
for ruleset in args.ruleset:
consoleLogger.info(f"[+] Loading ruleset from : {ruleset}")
zircoliteCore.loadRulesetFromFile(filename=ruleset, ruleFilters=args.rulefilter)
consoleLogger.info("[+] Loading ruleset(s)")
rulesetsManager = rulesetHandler(consoleLogger, args)
for ruleset in rulesetsManager.Rulesets:
zircoliteCore.loadRulesetFromVar(ruleset=ruleset, ruleFilters=args.rulefilter)
if args.limit > 0:
consoleLogger.info(f"[+] Limited mode : detections with more than {args.limit} events will be discarded")
consoleLogger.info(f"[+] Executing ruleset - {len(zircoliteCore.ruleset)} rules")