Refactor some functions
This commit is contained in:
wagga40
2025-04-06 10:34:48 +02:00
parent d2b421b60a
commit edfc67c866
4 changed files with 270 additions and 199 deletions

82
Taskfile.yml Normal file
View File

@@ -0,0 +1,82 @@
version: '3'
vars:
DOCKER: docker
DOCKER_BUILD_FLAGS: ''
DOCKER_REGISTRY: docker.io
DOCKER_REPO: wagga40/zircolite
RULES_URL: https://github.com/wagga40/Zircolite-Rules/archive/refs/heads/main.tar.gz
PLATFORMS: linux/amd64,linux/arm64
tasks:
default:
deps: [clean]
get-version:
desc: Get the version from zircolite.py
cmds:
- 'echo "Version: {{.VERSION}}"'
vars:
VERSION:
sh: |
cat zircolite.py | grep "version = \"" | cut -d'"' -f2
docker-build:
desc: Build the Docker image
preconditions:
- sh: command -v {{.DOCKER}}
msg: "Docker (https://docs.docker.com/install/) is required. Please install it first"
cmds:
- '{{.DOCKER}} image build --rm --tag {{.DOCKER_REPO}}:dev {{.DOCKER_BUILD_FLAGS}} .'
docker-build-multi-arch:
desc: Build the Docker image for multiple architectures
cmds:
- '{{.DOCKER}} image build --rm --tag {{.DOCKER_REPO}}:{{.VERSION}} --platform {{.PLATFORMS}} {{.DOCKER_BUILD_FLAGS}} .'
- '{{.DOCKER}} image build --rm --tag {{.DOCKER_REPO}}:latest --platform {{.PLATFORMS}} {{.DOCKER_BUILD_FLAGS}} .'
vars:
VERSION:
sh: |
cat zircolite.py | grep "version = \"" | cut -d'"' -f2
docker-push:
desc: Push the Docker image to docker hub
deps: [docker-build-multi-arch]
cmds:
- '{{.DOCKER}} image push {{.DOCKER_REGISTRY}}/{{.DOCKER_REPO}}:{{.VERSION}}'
- '{{.DOCKER}} image push {{.DOCKER_REGISTRY}}/{{.DOCKER_REPO}}:latest'
vars:
VERSION:
sh: |
cat zircolite.py | grep "version = \"" | cut -d'"' -f2
clean:
desc: Remove all default artifacts
cmds:
- rm -rf detected_events.json
- rm -rf ./tmp-*
- rm -f zircolite.log
- rm -f fields.json
- rm -f zircolite.tar
save:
desc: Save the Docker image to an archive
cmds:
- |
if ! command -v docker &> /dev/null; then
echo "Docker (https://docs.docker.com/install/) is required. Please install it first"
exit 1
fi
- '{{.DOCKER}} image save --output zircolite.tar {{.DOCKER_REGISTRY}}/{{.DOCKER_REPO}}:{{.DOCKER_TAG}}'
update-rules:
desc: Update default rulesets using Zircolite-Rules repository
preconditions:
- sh: command -v curl
msg: "curl is required. Please install it first"
prompt: "This task will overwrite the existing rulesets. Are you sure you want to continue?"
cmds:
- curl -sSL {{.RULES_URL}} | tar -xzf -
- defer: rm -rf Zircolite-Rules-main
- mv Zircolite-Rules-main/rules_*.json ./rules/

View File

@@ -6,9 +6,9 @@
"Event.EventData.UserData" : "UserData",
"Event.System.Provider.#attributes.Guid" : "Guid",
"Event.EventData.ContextInfo": "ContextInfo",
"Event.System.Execution.#attributes.ProcessID": "ProcessID",
"Event.System.Execution.#attributes.ThreadID": "ThreadID",
"Event.System.EventID" : "EventID",
"Event.System.Execution.#attributes.ProcessID": "ProcessID",
"Event.System.Execution.#attributes.ThreadID": "ThreadID",
"Event.System.EventID" : "EventID",
"Event.System.EventID.#text" : "EventID",
"Event.System.Channel":"Channel",
"Event.System.Computer":"Computer",

Binary file not shown.

View File

@@ -208,6 +208,8 @@ class JSONFlattener:
return param # Return the original parameter if transform fails
def flatten(x, name=''):
# Sample EVTX event : {"Event":{"#attributes":{"xmlns":"http://schemas.microsoft.com/win/2004/08/events/event"},"System":{"Provider":{"#attributes":{"Name":"Microsoft-Windows-Sysmon","Guid":"5770385F-C22A-43E0-BF4C-06F5698FFBD9"}},"EventID":16,"Version":3,"Level":4,"Task":16,"Opcode":0,"Keywords":"0x8000000000000000","TimeCreated":{"#attributes":{"SystemTime":"2021-03-03T12:47:27.371669Z"}},"EventRecordID":1,"Correlation":null,"Execution":{"#attributes":{"ProcessID":5132,"ThreadID":6404}},"Channel":"Microsoft-Windows-Sysmon/Operational","Computer":"DESKTOP-ET1DJSR","Security":{"#attributes":{"UserID":"S-1-5-21-1250304854-3630730510-2981668747-1001"}}},"EventData":{"UtcTime":"2021-03-03 12:47:27.369","Configuration":"C:\\Users\\user\\Downloads\\sysmonconfig-export.xml","ConfigurationFileHash":"SHA256=EA5133261F8C5D31A30DD852A05B90E804103837310C14A69747BA8367D6CDB5"}}}
nonlocal fieldStmt
# If it is a Dict go deeper
if isinstance(x, dict):
@@ -223,29 +225,25 @@ class JSONFlattener:
value = x
# Excluding useless values (e.g. "null"). The value must be an exact match.
if value not in self.uselessValues:
# Applying field mappings
rawFieldName = name[:-1]
if rawFieldName in self.fieldMappings:
key = self.fieldMappings[rawFieldName]
else:
# Removing all annoying character from field name
key = ''.join(e for e in rawFieldName.split(".")[-1] if e.isalnum())
key = self.fieldMappings.get(rawFieldName, ''.join(e for e in rawFieldName.split(".")[-1] if e.isalnum()))
# Preparing aliases (work on original field name and Mapped field name)
keys = [key]
for fieldName in [key, rawFieldName]:
if fieldName in self.aliases:
keys.append(self.aliases[key])
if key in self.aliases:
keys.append(self.aliases[key])
if rawFieldName in self.aliases:
keys.append(self.aliases[rawFieldName])
# Applying field transforms (work on original field name and Mapped field name)
# Applying field transforms
keysThatNeedTransformedValues = []
transformedValuesByKeys = {}
if self.transforms_enabled:
for fieldName in [key, rawFieldName]:
if fieldName in self.transforms:
for transform in self.transforms[fieldName]:
if transform["enabled"] and self.chosen_input in transform["source_condition"] :
if transform["enabled"] and self.chosen_input in transform["source_condition"]:
transformCode = transform["code"]
# If the transform rule ask for a dedicated alias
if transform["alias"]:
@@ -257,17 +255,17 @@ class JSONFlattener:
# Applying field splitting
fieldsToSplit = []
if rawFieldName in self.fieldSplitList:
if rawFieldName in self.fieldSplitList:
fieldsToSplit.append(rawFieldName)
if key in self.fieldSplitList:
if key in self.fieldSplitList:
fieldsToSplit.append(key)
if len(fieldsToSplit) > 0:
if fieldsToSplit:
for field in fieldsToSplit:
try:
splittedFields = value.split(self.fieldSplitList[field]["separator"])
for splittedField in splittedFields:
k,v = splittedField.split(self.fieldSplitList[field]["equal"])
k, v = splittedField.split(self.fieldSplitList[field]["equal"])
keyLower = k.lower()
JSONLine[k] = v
if keyLower not in self.keyDict:
@@ -286,10 +284,7 @@ class JSONFlattener:
keyLower = key.lower()
if keyLower not in self.keyDict:
self.keyDict[keyLower] = key
if isinstance(value, int):
fieldStmt += f"'{key}' INTEGER,\n"
else:
fieldStmt += f"'{key}' TEXT COLLATE NOCASE,\n"
fieldStmt += f"'{key}' {'INTEGER' if isinstance(value, int) else 'TEXT COLLATE NOCASE'},\n"
# If filesize is not zero
if os.stat(file).st_size != 0:
@@ -303,20 +298,24 @@ class JSONFlattener:
except Exception as e:
self.logger.debug(f'JSON ARRAY ERROR : {e}')
logs = []
for line in logs:
try:
if self.JSONArray:
dictToFlatten = line
else:
dictToFlatten = json.loads(line)
dictToFlatten.update({"OriginalLogfile": filename})
if self.hashes:
dictToFlatten.update({"OriginalLogLinexxHash": xxhash.xxh64_hexdigest(line[:-1])})
dictToFlatten["OriginalLogfile"] = filename
if self.hashes:
dictToFlatten["OriginalLogLinexxHash"] = xxhash.xxh64_hexdigest(line[:-1])
flatten(dictToFlatten)
except Exception as e:
self.logger.debug(f'JSON ERROR : {e}')
# Handle timestamp filters
if (self.timeAfter != "1970-01-01T00:00:00" or self.timeBefore != "9999-12-12T23:59:59") and (self.timeField in JSONLine):
if ((self.timeAfter != "1970-01-01T00:00:00" or self.timeBefore != "9999-12-12T23:59:59")
and (self.timeField in JSONLine)):
try:
timestamp = time.strptime(JSONLine[self.timeField].split(".")[0].replace("Z",""), '%Y-%m-%dT%H:%M:%S')
if timestamp > self.timeAfter and timestamp < self.timeBefore:
@@ -325,7 +324,9 @@ class JSONFlattener:
JSONOutput.append(JSONLine)
else:
JSONOutput.append(JSONLine)
JSONLine = {}
return {"dbFields": fieldStmt, "dbValues": JSONOutput}
def runAll(self, EVTXJSONList):
@@ -874,50 +875,38 @@ class evtxExtractor:
self.logger.debug(f"EXTRACTING : {file}")
filename = Path(file).name
outputJSONFilename = f"{self.tmpDir}/{str(filename)}-{self.randString()}.json"
# Auditd or Sysmon4Linux logs
if self.sysmon4linux or self.auditdLogs:
# Choose which log backend to use
if self.sysmon4linux:
func = self.SysmonXMLLine2JSON
elif self.auditdLogs:
func = self.auditdLine2JSON
try:
try:
# Auditd or Sysmon4Linux logs
if self.sysmon4linux or self.auditdLogs:
func = self.SysmonXMLLine2JSON if self.sysmon4linux else self.auditdLine2JSON
self.Logs2JSON(func, str(file), outputJSONFilename)
except Exception as e:
self.logger.error(f"{Fore.RED} [-] {e}{Fore.RESET}")
# XML logs
elif self.xmlLogs:
try:
data = ""
# We need to read the entire file to remove annoying newlines and fields with newlines (System.evtx Logs for example...)
# XML logs
elif self.xmlLogs:
with open(str(file), 'r', encoding="utf-8") as XMLFile:
data = XMLFile.read().replace("\n","").replace("</Event>","</Event>\n").replace("<Event ","\n<Event ")
self.Logs2JSON(self.XMLLine2JSON, data, outputJSONFilename, isFile=False)
except Exception as e:
self.logger.error(f"{Fore.RED} [-] {e}{Fore.RESET}")
# EVTXtract
elif self.evtxtract:
try:
# EVTXtract
elif self.evtxtract:
self.evtxtract2JSON(str(file), outputJSONFilename)
except Exception as e:
self.logger.error(f"{Fore.RED} [-] {e}{Fore.RESET}")
# CSV
elif self.csvInput:
try:
# CSV
elif self.csvInput:
self.csv2JSON(str(file), outputJSONFilename)
except Exception as e:
self.logger.error(f"{Fore.RED} [-] {e}{Fore.RESET}")
# EVTX
else:
if not self.useExternalBinaries or not Path(self.evtxDumpCmd).is_file():
self.logger.debug(" [-] No external binaries args or evtx_dump is missing")
self.runUsingBindings(file)
# EVTX
else:
try:
if not self.useExternalBinaries or not Path(self.evtxDumpCmd).is_file():
self.logger.debug(" [-] No external binaries args or evtx_dump is missing")
self.runUsingBindings(file)
else:
cmd = [self.evtxDumpCmd, "--no-confirm-overwrite", "-o", "jsonl", str(file), "-f", outputJSONFilename, "-t", str(self.cores)]
subprocess.call(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
except Exception as e:
self.logger.error(f"{Fore.RED} [-] {e}{Fore.RESET}")
except Exception as e:
self.logger.error(f"{Fore.RED} [-] {e}{Fore.RESET}")
def cleanup(self):
shutil.rmtree(self.tmpDir)
@@ -1237,74 +1226,74 @@ def main():
# Init Args handling
parser = argparse.ArgumentParser()
# Input files and filtering/selection options
logsInputArgs = parser.add_argument_group(f'{Fore.BLUE}INPUT FILES AND FILTERING/SELECTION OPTIONS{Fore.RESET}')
logsInputArgs.add_argument("-e", "--evtx", "--events", help="Path to log file or directory containing log files in supported format", type=str)
logsInputArgs.add_argument("-s", "--select", help="Process only files with filenames containing the specified string (applied before exclusions)", action='append', nargs='+')
logsInputArgs.add_argument("-a", "--avoid", help="Skip files with filenames containing the specified string", action='append', nargs='+')
logsInputArgs.add_argument("-f", "--fileext", help="File extension of the log files to process", type=str)
logsInputArgs.add_argument("-fp", "--file-pattern", help="Python Glob pattern to select files (only works with directories)", type=str)
logsInputArgs.add_argument("--no-recursion", help="Search for log files only in the specified directory (disable recursive search)", action="store_true")
logs_input_args = parser.add_argument_group(f'{Fore.BLUE}INPUT FILES AND FILTERING/SELECTION OPTIONS{Fore.RESET}')
logs_input_args.add_argument("-e", "--evtx", "--events", help="Path to log file or directory containing log files in supported format", type=str)
logs_input_args.add_argument("-s", "--select", help="Process only files with filenames containing the specified string (applied before exclusions)", action='append', nargs='+')
logs_input_args.add_argument("-a", "--avoid", help="Skip files with filenames containing the specified string", action='append', nargs='+')
logs_input_args.add_argument("-f", "--fileext", help="File extension of the log files to process", type=str)
logs_input_args.add_argument("-fp", "--file-pattern", help="Python Glob pattern to select files (only works with directories)", type=str)
logs_input_args.add_argument("--no-recursion", help="Search for log files only in the specified directory (disable recursive search)", action="store_true")
# Events filtering options
eventArgs = parser.add_argument_group(f'{Fore.BLUE}EVENTS FILTERING OPTIONS{Fore.RESET}')
eventArgs.add_argument("-A", "--after", help="Process only events after this timestamp (UTC format: 1970-01-01T00:00:00)", type=str, default="1970-01-01T00:00:00")
eventArgs.add_argument("-B", "--before", help="Process only events before this timestamp (UTC format: 1970-01-01T00:00:00)", type=str, default="9999-12-12T23:59:59")
event_args = parser.add_argument_group(f'{Fore.BLUE}EVENTS FILTERING OPTIONS{Fore.RESET}')
event_args.add_argument("-A", "--after", help="Process only events after this timestamp (UTC format: 1970-01-01T00:00:00)", type=str, default="1970-01-01T00:00:00")
event_args.add_argument("-B", "--before", help="Process only events before this timestamp (UTC format: 1970-01-01T00:00:00)", type=str, default="9999-12-12T23:59:59")
# Event and log formats options
# /!\ an option name containing '-input' must exists (It is used in JSON flattening mechanism)
eventFormatsArgs = parser.add_mutually_exclusive_group()
eventFormatsArgs.add_argument("-j", "--json-input", "--jsononly", "--jsonline", "--jsonl", help="Input logs are in JSON lines format", action='store_true')
eventFormatsArgs.add_argument("--json-array-input", "--jsonarray", "--json-array", help="Input logs are in JSON array format", action='store_true')
eventFormatsArgs.add_argument("--db-input", "-D", "--dbonly", help="Use a previously saved database file (time range filters will not work)", action='store_true')
eventFormatsArgs.add_argument("-S", "--sysmon-linux-input", "--sysmon4linux", "--sysmon-linux", help="Process Sysmon for Linux log files (default extension: '.log')", action='store_true')
eventFormatsArgs.add_argument("-AU", "--auditd-input", "--auditd", help="Process Auditd log files (default extension: '.log')", action='store_true')
eventFormatsArgs.add_argument("-x", "--xml-input", "--xml", help="Process EVTX files converted to XML format (default extension: '.xml')", action='store_true')
eventFormatsArgs.add_argument("--evtxtract-input", "--evtxtract", help="Process log files extracted with EVTXtract (default extension: '.log')", action='store_true')
eventFormatsArgs.add_argument("--csv-input", "--csvonly", help="Process log files in CSV format (extension: '.csv')", action='store_true')
event_formats_args = parser.add_mutually_exclusive_group()
event_formats_args.add_argument("-j", "--json-input", "--jsononly", "--jsonline", "--jsonl", help="Input logs are in JSON lines format", action='store_true')
event_formats_args.add_argument("--json-array-input", "--jsonarray", "--json-array", help="Input logs are in JSON array format", action='store_true')
event_formats_args.add_argument("--db-input", "-D", "--dbonly", help="Use a previously saved database file (time range filters will not work)", action='store_true')
event_formats_args.add_argument("-S", "--sysmon-linux-input", "--sysmon4linux", "--sysmon-linux", help="Process Sysmon for Linux log files (default extension: '.log')", action='store_true')
event_formats_args.add_argument("-AU", "--auditd-input", "--auditd", help="Process Auditd log files (default extension: '.log')", action='store_true')
event_formats_args.add_argument("-x", "--xml-input", "--xml", help="Process EVTX files converted to XML format (default extension: '.xml')", action='store_true')
event_formats_args.add_argument("--evtxtract-input", "--evtxtract", help="Process log files extracted with EVTXtract (default extension: '.log')", action='store_true')
event_formats_args.add_argument("--csv-input", "--csvonly", help="Process log files in CSV format (extension: '.csv')", action='store_true')
# Ruleset options
rulesetsFormatsArgs = parser.add_argument_group(f'{Fore.BLUE}RULES AND RULESETS OPTIONS{Fore.RESET}')
rulesetsFormatsArgs.add_argument("-r", "--ruleset", help="Sigma ruleset in JSON (Zircolite format) or YAML/directory of YAML files (Native Sigma format)", action='append', nargs='+')
rulesetsFormatsArgs.add_argument("-nsc", "--no-sigma-conversion", help=argparse.SUPPRESS, action='store_true')
rulesetsFormatsArgs.add_argument("-cr", "--combine-rulesets", help="Merge all provided rulesets into one", action='store_true')
rulesetsFormatsArgs.add_argument("-sr", "--save-ruleset", help="Save converted ruleset (from Sigma to Zircolite format) to disk", action='store_true')
rulesetsFormatsArgs.add_argument("-p", "--pipeline", help="Use specified pipeline for native Sigma rulesets (YAML). Examples: 'sysmon', 'windows-logsources', 'windows-audit'. Use '--pipeline-list' to see available pipelines.", action='append', nargs='+')
rulesetsFormatsArgs.add_argument("-pl", "--pipeline-list", help="List all installed pysigma pipelines", action='store_true')
rulesetsFormatsArgs.add_argument("-pn", "--pipeline-null", help="Don't use any pipeline for native Sigma rulesets (Default)", action='store_true')
rulesetsFormatsArgs.add_argument("-R", "--rulefilter", help="Remove rules from ruleset by matching rule title (case sensitive)", action='append', nargs='*')
rulesets_formats_args = parser.add_argument_group(f'{Fore.BLUE}RULES AND RULESETS OPTIONS{Fore.RESET}')
rulesets_formats_args.add_argument("-r", "--ruleset", help="Sigma ruleset in JSON (Zircolite format) or YAML/directory of YAML files (Native Sigma format)", action='append', nargs='+')
rulesets_formats_args.add_argument("-nsc", "--no-sigma-conversion", help=argparse.SUPPRESS, action='store_true')
rulesets_formats_args.add_argument("-cr", "--combine-rulesets", help="Merge all provided rulesets into one", action='store_true')
rulesets_formats_args.add_argument("-sr", "--save-ruleset", help="Save converted ruleset (from Sigma to Zircolite format) to disk", action='store_true')
rulesets_formats_args.add_argument("-p", "--pipeline", help="Use specified pipeline for native Sigma rulesets (YAML). Examples: 'sysmon', 'windows-logsources', 'windows-audit'. Use '--pipeline-list' to see available pipelines.", action='append', nargs='+')
rulesets_formats_args.add_argument("-pl", "--pipeline-list", help="List all installed pysigma pipelines", action='store_true')
rulesets_formats_args.add_argument("-pn", "--pipeline-null", help="Don't use any pipeline for native Sigma rulesets (Default)", action='store_true')
rulesets_formats_args.add_argument("-R", "--rulefilter", help="Remove rules from ruleset by matching rule title (case sensitive)", action='append', nargs='*')
# Ouput formats and output files options
outputFormatsArgs = parser.add_argument_group(f'{Fore.BLUE}OUPUT FORMATS AND OUTPUT FILES OPTIONS{Fore.RESET}')
outputFormatsArgs.add_argument("-o", "--outfile", help="Output file for detected events", type=str, default="detected_events.json")
outputFormatsArgs.add_argument("--csv", "--csv-output", help="Output results in CSV format (empty fields will be included)", action='store_true')
outputFormatsArgs.add_argument("--csv-delimiter", help="Delimiter for CSV output", type=str, default=";")
outputFormatsArgs.add_argument("-t", "--tmpdir", help="Temporary directory for JSON-converted events (parent directories must exist)", type=str)
outputFormatsArgs.add_argument("-k", "--keeptmp", help="Keep the temporary directory with JSON-converted events", action='store_true')
outputFormatsArgs.add_argument("--keepflat", help="Save flattened events as JSON", action='store_true')
outputFormatsArgs.add_argument("-d", "--dbfile", help="Save all logs to a SQLite database file", type=str)
outputFormatsArgs.add_argument("-l", "--logfile", help="Log file name", default="zircolite.log", type=str)
outputFormatsArgs.add_argument("--hashes", help="Add xxhash64 of the original log event to each event", action='store_true')
outputFormatsArgs.add_argument("-L", "--limit", "--limit-results", help="Discard results exceeding this limit from output file", type=int, default=-1)
output_formats_args = parser.add_argument_group(f'{Fore.BLUE}OUPUT FORMATS AND OUTPUT FILES OPTIONS{Fore.RESET}')
output_formats_args.add_argument("-o", "--outfile", help="Output file for detected events", type=str, default="detected_events.json")
output_formats_args.add_argument("--csv", "--csv-output", help="Output results in CSV format (empty fields will be included)", action='store_true')
output_formats_args.add_argument("--csv-delimiter", help="Delimiter for CSV output", type=str, default=";")
output_formats_args.add_argument("-t", "--tmpdir", help="Temporary directory for JSON-converted events (parent directories must exist)", type=str)
output_formats_args.add_argument("-k", "--keeptmp", help="Keep the temporary directory with JSON-converted events", action='store_true')
output_formats_args.add_argument("--keepflat", help="Save flattened events as JSON", action='store_true')
output_formats_args.add_argument("-d", "--dbfile", help="Save all logs to a SQLite database file", type=str)
output_formats_args.add_argument("-l", "--logfile", help="Log file name", default="zircolite.log", type=str)
output_formats_args.add_argument("--hashes", help="Add xxhash64 of the original log event to each event", action='store_true')
output_formats_args.add_argument("-L", "--limit", "--limit-results", help="Discard results exceeding this limit from output file", type=int, default=-1)
# Advanced configuration options
configFormatsArgs = parser.add_argument_group(f'{Fore.BLUE}ADVANCED CONFIGURATION OPTIONS{Fore.RESET}')
configFormatsArgs.add_argument("-c", "--config", help="JSON file containing field mappings and exclusions", type=str, default="config/fieldMappings.json")
eventFormatsArgs.add_argument("-LE", "--logs-encoding", help="Specify encoding for Sysmon for Linux or Auditd files", type=str)
configFormatsArgs.add_argument("--fieldlist", help="List all event fields", action='store_true')
configFormatsArgs.add_argument("--evtx_dump", help="Path to evtx_dump binary for EVTX conversion (on Linux/MacOS use './evtx_dump' format)", type=str, default=None)
configFormatsArgs.add_argument("--noexternal", "--bindings", help="Use Python bindings instead of external evtx_dump binaries (slower)", action='store_true')
configFormatsArgs.add_argument("--cores", help="Number of CPU cores to use for EVTX extraction (default: all cores)", type=str)
configFormatsArgs.add_argument("--debug", help="Enable debug logging", action='store_true')
configFormatsArgs.add_argument("--imports", help="Show detailed module import errors", action='store_true')
configFormatsArgs.add_argument("--showall", help="Show all events (helps identify slow rules)", action='store_true')
configFormatsArgs.add_argument("-n", "--nolog", help="Don't create log or result files", action='store_true')
configFormatsArgs.add_argument("--ondiskdb", help="Use on-disk database instead of in-memory (slower but uses less RAM)", type=str, default=":memory:")
configFormatsArgs.add_argument("-RE", "--remove-events", help="Remove processed log files after successful analysis (use with caution)", action='store_true')
configFormatsArgs.add_argument("-U", "--update-rules", help="Update rulesets in the 'rules' directory", action='store_true')
configFormatsArgs.add_argument("-v", "--version", help="Display Zircolite version", action='store_true')
configFormatsArgs.add_argument("--timefield", help="Specify time field name for event forwarding (default: 'SystemTime')", default="SystemTime", action="store_true")
config_formats_args = parser.add_argument_group(f'{Fore.BLUE}ADVANCED CONFIGURATION OPTIONS{Fore.RESET}')
config_formats_args.add_argument("-c", "--config", help="JSON file containing field mappings and exclusions", type=str, default="config/fieldMappings.json")
event_formats_args.add_argument("-LE", "--logs-encoding", help="Specify encoding for Sysmon for Linux or Auditd files", type=str)
config_formats_args.add_argument("--fieldlist", help="List all event fields", action='store_true')
config_formats_args.add_argument("--evtx_dump", help="Path to evtx_dump binary for EVTX conversion (on Linux/MacOS use './evtx_dump' format)", type=str, default=None)
config_formats_args.add_argument("--noexternal", "--bindings", help="Use Python bindings instead of external evtx_dump binaries (slower)", action='store_true')
config_formats_args.add_argument("--cores", help="Number of CPU cores to use for EVTX extraction (default: all cores)", type=str)
config_formats_args.add_argument("--debug", help="Enable debug logging", action='store_true')
config_formats_args.add_argument("--imports", help="Show detailed module import errors", action='store_true')
config_formats_args.add_argument("--showall", help="Show all events (helps identify slow rules)", action='store_true')
config_formats_args.add_argument("-n", "--nolog", help="Don't create log or result files", action='store_true')
config_formats_args.add_argument("--ondiskdb", help="Use on-disk database instead of in-memory (slower but uses less RAM)", type=str, default=":memory:")
config_formats_args.add_argument("-RE", "--remove-events", help="Remove processed log files after successful analysis (use with caution)", action='store_true')
config_formats_args.add_argument("-U", "--update-rules", help="Update rulesets in the 'rules' directory", action='store_true')
config_formats_args.add_argument("-v", "--version", help="Display Zircolite version", action='store_true')
config_formats_args.add_argument("--timefield", help="Specify time field name for event forwarding (default: 'SystemTime')", default="SystemTime", action="store_true")
# Templating and Mini GUI options
templatingFormatsArgs = parser.add_argument_group(f'{Fore.BLUE}TEMPLATING AND MINI GUI OPTIONS{Fore.RESET}')
templatingFormatsArgs.add_argument("--template", help="Jinja2 template to use for output generation", type=str, action='append', nargs='+')
templatingFormatsArgs.add_argument("--templateOutput", help="Output file for Jinja2 template results", type=str, action='append', nargs='+')
templatingFormatsArgs.add_argument("--package", help="Create a ZircoGui/Mini GUI package", action='store_true')
templatingFormatsArgs.add_argument("--package-dir", help="Directory to save the ZircoGui/Mini GUI package", type=str, default="")
templating_formats_args = parser.add_argument_group(f'{Fore.BLUE}TEMPLATING AND MINI GUI OPTIONS{Fore.RESET}')
templating_formats_args.add_argument("--template", help="Jinja2 template to use for output generation", type=str, action='append', nargs='+')
templating_formats_args.add_argument("--templateOutput", help="Output file for Jinja2 template results", type=str, action='append', nargs='+')
templating_formats_args.add_argument("--package", help="Create a ZircoGui/Mini GUI package", action='store_true')
templating_formats_args.add_argument("--package-dir", help="Directory to save the ZircoGui/Mini GUI package", type=str, default="")
args = parser.parse_args()
signal.signal(signal.SIGINT, signal_handler)
@@ -1312,9 +1301,9 @@ def main():
# Init logging
if args.nolog:
args.logfile = None
consoleLogger = initLogger(args.debug, args.logfile)
console_logger = initLogger(args.debug, args.logfile)
consoleLogger.info("""
console_logger.info("""
███████╗██╗██████╗ ██████╗ ██████╗ ██╗ ██╗████████╗███████╗
╚══███╔╝██║██╔══██╗██╔════╝██╔═══██╗██║ ██║╚══██╔══╝██╔════╝
███╔╝ ██║██████╔╝██║ ██║ ██║██║ ██║ ██║ █████╗
@@ -1326,22 +1315,22 @@ def main():
# Print version and quit
if args.version:
consoleLogger.info(f"Zircolite - v{version}")
console_logger.info(f"Zircolite - v{version}")
sys.exit(0)
# Show imports status
importsMessage, args, mustQuit = ImportErrorHandler(args)
if importsMessage:
consoleLogger.info(f"[+] Modules imports status: \n{importsMessage}")
imports_message, args, must_quit = ImportErrorHandler(args)
if imports_message:
console_logger.info(f"[+] Modules imports status: \n{imports_message}")
else:
consoleLogger.info("[+] Modules imports status: OK")
if mustQuit:
console_logger.info("[+] Modules imports status: OK")
if must_quit:
sys.exit(1)
# Update rulesets
if args.update_rules:
consoleLogger.info("[+] Updating rules")
updater = rulesUpdater(logger=consoleLogger)
console_logger.info("[+] Updating rules")
updater = rulesUpdater(logger=console_logger)
updater.run()
sys.exit(0)
@@ -1352,54 +1341,54 @@ def main():
args.ruleset = ["rules/rules_windows_generic_pysigma.json"]
# Loading rulesets
consoleLogger.info("[+] Loading ruleset(s)")
rulesetsManager = rulesetHandler(consoleLogger, args, args.pipeline_list)
console_logger.info("[+] Loading ruleset(s)")
rulesets_manager = rulesetHandler(console_logger, args, args.pipeline_list)
if args.pipeline_list:
sys.exit(0)
# Check mandatory CLI options
if not args.evtx:
consoleLogger.error(f"{Fore.RED} [-] No events source path provided. Use '-e <PATH TO LOGS>', '--events <PATH TO LOGS>'{Fore.RESET}")
console_logger.error(f"{Fore.RED} [-] No events source path provided. Use '-e <PATH TO LOGS>', '--events <PATH TO LOGS>'{Fore.RESET}")
sys.exit(2)
if args.csv and len(args.ruleset) > 1:
consoleLogger.error(f"{Fore.RED} [-] Since fields in results can change between rulesets, it is not possible to have CSV output when using multiple rulesets{Fore.RESET}")
console_logger.error(f"{Fore.RED} [-] Since fields in results can change between rulesets, it is not possible to have CSV output when using multiple rulesets{Fore.RESET}")
sys.exit(2)
consoleLogger.info("[+] Checking prerequisites")
console_logger.info("[+] Checking prerequisites")
# Checking provided timestamps
try:
eventsAfter = time.strptime(args.after, '%Y-%m-%dT%H:%M:%S')
eventsBefore = time.strptime(args.before, '%Y-%m-%dT%H:%M:%S')
events_after = time.strptime(args.after, '%Y-%m-%dT%H:%M:%S')
events_before = time.strptime(args.before, '%Y-%m-%dT%H:%M:%S')
except Exception:
quitOnError(f"{Fore.RED} [-] Wrong timestamp format. Please use 'YYYY-MM-DDTHH:MM:SS'", consoleLogger)
quitOnError(f"{Fore.RED} [-] Wrong timestamp format. Please use 'YYYY-MM-DDTHH:MM:SS'", console_logger)
# Check templates args
readyForTemplating = False
ready_for_templating = False
if args.template is not None:
if args.csv:
quitOnError(f"{Fore.RED} [-] You cannot use templates in CSV mode{Fore.RESET}", consoleLogger)
quitOnError(f"{Fore.RED} [-] You cannot use templates in CSV mode{Fore.RESET}", console_logger)
if args.templateOutput is None or len(args.template) != len(args.templateOutput):
quitOnError(f"{Fore.RED} [-] Number of templates output must match number of templates{Fore.RESET}", consoleLogger)
quitOnError(f"{Fore.RED} [-] Number of templates output must match number of templates{Fore.RESET}", console_logger)
for template in args.template:
checkIfExists(template[0], f"{Fore.RED} [-] Cannot find template: {template[0]}. Default templates are available here: https://github.com/wagga40/Zircolite/tree/master/templates{Fore.RESET}", consoleLogger)
readyForTemplating = True
checkIfExists(template[0], f"{Fore.RED} [-] Cannot find template: {template[0]}. Default templates are available here: https://github.com/wagga40/Zircolite/tree/master/templates{Fore.RESET}", console_logger)
ready_for_templating = True
# Change output filename in CSV mode
if args.csv:
readyForTemplating = False
ready_for_templating = False
if args.outfile == "detected_events.json":
args.outfile = "detected_events.csv"
# If on-disk DB already exists, quit
if args.ondiskdb != ":memory:" and Path(args.ondiskdb).is_file():
quitOnError(f"{Fore.RED} [-] On-disk database already exists{Fore.RESET}", consoleLogger)
quitOnError(f"{Fore.RED} [-] On-disk database already exists{Fore.RESET}", console_logger)
# Start time counting
start_time = time.time()
# Initialize zirCore
zircoliteCore = zirCore(args.config, logger=consoleLogger, noOutput=args.nolog, timeAfter=eventsAfter, timeBefore=eventsBefore, limit=args.limit, csvMode=args.csv, timeField=args.timefield, hashes=args.hashes, dbLocation=args.ondiskdb, delimiter=args.csv_delimiter)
zircolite_core = zirCore(args.config, logger=console_logger, noOutput=args.nolog, timeAfter=events_after, timeBefore=events_before, limit=args.limit, csvMode=args.csv, timeField=args.timefield, hashes=args.hashes, dbLocation=args.ondiskdb, delimiter=args.csv_delimiter)
# If we are not working directly with the db
if not args.db_input:
@@ -1417,32 +1406,32 @@ def main():
args.fileext = "evtx"
# Find log files based on path and pattern
LogPath = Path(args.evtx)
if LogPath.is_dir():
log_path = Path(args.evtx)
if log_path.is_dir():
pattern = f"*.{args.fileext}"
if args.file_pattern not in [None, ""]:
pattern = args.file_pattern
# Use appropriate glob function based on recursion setting
fnGlob = LogPath.rglob if not args.no_recursion else LogPath.glob
LogList = list(fnGlob(pattern))
elif LogPath.is_file():
LogList = [LogPath]
fn_glob = log_path.rglob if not args.no_recursion else log_path.glob
log_list = list(fn_glob(pattern))
elif log_path.is_file():
log_list = [log_path]
else:
quitOnError(f"{Fore.RED} [-] Unable to find events from submitted path{Fore.RESET}", consoleLogger)
quitOnError(f"{Fore.RED} [-] Unable to find events from submitted path{Fore.RESET}", console_logger)
# Apply file filters
FileList = avoidFiles(selectFiles(LogList, args.select), args.avoid)
if not FileList:
quitOnError(f"{Fore.RED} [-] No file found. Please verify filters, directory or the extension with '--fileext' or '--file-pattern'{Fore.RESET}", consoleLogger)
file_list = avoidFiles(selectFiles(log_list, args.select), args.avoid)
if not file_list:
quitOnError(f"{Fore.RED} [-] No file found. Please verify filters, directory or the extension with '--fileext' or '--file-pattern'{Fore.RESET}", console_logger)
# Process logs based on input type
if args.json_input or args.json_array_input:
LogJSONList = FileList
log_json_list = file_list
else:
# Initialize extractor for non-JSON formats
extractor = evtxExtractor(
logger=consoleLogger,
logger=console_logger,
providedTmpDir=args.tmpdir,
coreCount=args.cores,
useExternalBinaries=(not args.noexternal),
@@ -1456,94 +1445,94 @@ def main():
)
# Extract events
consoleLogger.info(f"[+] Extracting events Using '{extractor.tmpDir}' directory ")
for evtx in tqdm(FileList, colour="yellow"):
console_logger.info(f"[+] Extracting events Using '{extractor.tmpDir}' directory ")
for evtx in tqdm(file_list, colour="yellow"):
extractor.run(evtx)
# Set path for extracted JSON files
LogJSONList = list(Path(extractor.tmpDir).rglob("*.json"))
log_json_list = list(Path(extractor.tmpDir).rglob("*.json"))
# Verify config file exists
checkIfExists(args.config, f"{Fore.RED} [-] Cannot find mapping file, you can get the default one here : https://github.com/wagga40/Zircolite/blob/master/config/fieldMappings.json {Fore.RESET}", consoleLogger)
checkIfExists(args.config, f"{Fore.RED} [-] Cannot find mapping file, you can get the default one here : https://github.com/wagga40/Zircolite/blob/master/config/fieldMappings.json {Fore.RESET}", console_logger)
if not LogJSONList:
quitOnError(f"{Fore.RED} [-] No files containing logs found.{Fore.RESET}", consoleLogger)
if not log_json_list:
quitOnError(f"{Fore.RED} [-] No files containing logs found.{Fore.RESET}", console_logger)
# Print field list and exit if requested
if args.fieldlist:
fields = zircoliteCore.run(LogJSONList, Insert2Db=False, args_config=args)
zircoliteCore.close()
fields = zircolite_core.run(log_json_list, Insert2Db=False, args_config=args)
zircolite_core.close()
if not (args.json_input or args.json_array_input or args.keeptmp):
extractor.cleanup()
[print(sortedField) for sortedField in sorted([field for field in fields.values()])]
[print(sorted_field) for sorted_field in sorted([field for field in fields.values()])]
sys.exit(0)
# Process logs and insert into database
zircoliteCore.run(LogJSONList, saveToFile=args.keepflat, args_config=args)
zircolite_core.run(log_json_list, saveToFile=args.keepflat, args_config=args)
# Save in-memory DB to disk if requested
if args.dbfile is not None:
zircoliteCore.saveDbToDisk(args.dbfile)
zircolite_core.saveDbToDisk(args.dbfile)
else:
consoleLogger.info(f"[+] Creating model from disk : {args.evtx}")
zircoliteCore.loadDbInMemory(args.evtx)
console_logger.info(f"[+] Creating model from disk : {args.evtx}")
zircolite_core.loadDbInMemory(args.evtx)
# flatten array of "rulefilter" arguments
if args.rulefilter:
args.rulefilter = [item for sublist in args.rulefilter for item in sublist]
writeMode = "w"
for i, ruleset in enumerate(rulesetsManager.Rulesets):
zircoliteCore.loadRulesetFromVar(ruleset=ruleset, ruleFilters=args.rulefilter)
write_mode = "w"
for i, ruleset in enumerate(rulesets_manager.Rulesets):
zircolite_core.loadRulesetFromVar(ruleset=ruleset, ruleFilters=args.rulefilter)
if args.limit > 0:
consoleLogger.info(f"[+] Limited mode : detections with more than {args.limit} events will be discarded")
console_logger.info(f"[+] Limited mode : detections with more than {args.limit} events will be discarded")
consoleLogger.info(f"[+] Executing ruleset - {len(zircoliteCore.ruleset)} rules")
isLastRuleset = (i == len(rulesetsManager.Rulesets) - 1)
zircoliteCore.executeRuleset(
console_logger.info(f"[+] Executing ruleset - {len(zircolite_core.ruleset)} rules")
is_last_ruleset = (i == len(rulesets_manager.Rulesets) - 1)
zircolite_core.executeRuleset(
args.outfile,
writeMode=writeMode,
writeMode=write_mode,
showAll=args.showall,
KeepResults=(readyForTemplating or args.package),
lastRuleset=isLastRuleset
KeepResults=(ready_for_templating or args.package),
lastRuleset=is_last_ruleset
)
writeMode = "a" # Next iterations will append to results file
write_mode = "a" # Next iterations will append to results file
consoleLogger.info(f"[+] Results written in : {args.outfile}")
console_logger.info(f"[+] Results written in : {args.outfile}")
# Process templates if needed
if readyForTemplating and zircoliteCore.fullResults:
templateGenerator = templateEngine(consoleLogger, args.template, args.templateOutput, args.timefield)
templateGenerator.run(zircoliteCore.fullResults)
if ready_for_templating and zircolite_core.fullResults:
template_generator = templateEngine(console_logger, args.template, args.templateOutput, args.timefield)
template_generator.run(zircolite_core.fullResults)
# Generate ZircoGui package if requested
if args.package and zircoliteCore.fullResults:
templatePath = Path("templates/exportForZircoGui.tmpl")
guiZipPath = Path("gui/zircogui.zip")
if templatePath.is_file() and guiZipPath.is_file():
packager = zircoGuiGenerator(str(guiZipPath), str(templatePath), consoleLogger, args.timefield)
packager.generate(zircoliteCore.fullResults, args.package_dir)
if args.package and zircolite_core.fullResults:
template_path = Path("templates/exportForZircoGui.tmpl")
gui_zip_path = Path("gui/zircogui.zip")
if template_path.is_file() and gui_zip_path.is_file():
packager = zircoGuiGenerator(str(gui_zip_path), str(template_path), console_logger, args.timefield)
packager.generate(zircolite_core.fullResults, args.package_dir)
# Cleanup temporary files
if not args.keeptmp:
consoleLogger.info("[+] Cleaning")
console_logger.info("[+] Cleaning")
try:
if not (args.json_input or args.json_array_input or args.db_input):
extractor.cleanup()
except OSError as e:
consoleLogger.error(f"{Fore.RED} [-] Error during cleanup {e}{Fore.RESET}")
console_logger.error(f"{Fore.RED} [-] Error during cleanup {e}{Fore.RESET}")
# Remove original event files if requested
if args.remove_events:
for evtx in LogList:
for evtx in log_list:
try:
os.remove(evtx)
except OSError as e:
consoleLogger.error(f"{Fore.RED} [-] Cannot remove file {e}{Fore.RESET}")
console_logger.error(f"{Fore.RED} [-] Cannot remove file {e}{Fore.RESET}")
zircoliteCore.close()
consoleLogger.info(f"\nFinished in {int((time.time() - start_time))} seconds")
zircolite_core.close()
console_logger.info(f"\nFinished in {int((time.time() - start_time))} seconds")
if __name__ == "__main__":
main()