pySigma Elasticsearch backend supporting Lucene, ES|QL (with correlations) and EQL queries
Project description
pySigma Elasticsearch Backend
This is the Elasticsearch backend for pySigma. It provides the package sigma.backends.elasticsearch
with the LuceneBackend
class.
It supports the following output formats:
- default: Lucene queries.
- dsl_lucene: DSL with embedded Lucene queries.
- eql: Elastic Event Query Language queries.
- kibana_ndjson: Kibana NDJSON with Lucene queries.
Further, it contains the following processing pipelines in sigma.pipelines.elasticsearch
:
- ecs_windows in windows submodule: ECS mapping for Windows event logs ingested with Winlogbeat.
- ecs_windows_old in windows submodule: ECS mapping for Windows event logs ingested with Winlogbeat <= 6.x.
- ecs_zeek_beats in zeek submodule: Zeek ECS mapping from Elastic.
- ecs_zeek_corelight in zeek submodule: Zeek ECS mapping from Corelight.
- zeek_raw in zeek submodule: Zeek raw JSON log field naming.
- ecs_kubernetes in kubernetes submodule: ECS mapping for Kubernetes audit logs ingested with Kubernetes integration
This backend is currently maintained by:
Further maintainers required! Send a message to Thomas if you want to co-maintain this backend.
Formats vs. Query Post Processing
While trying to support the minimum compatible output the built-in formats can't fits everyones needs. This gap is filled by a feature called "query post processing" available since pysigma v0.10.
For further information please read "Introducing Query Post-Processing and Output Finalization to Processing Pipelines".
Lucene Kibana NDJSON
Instead of using the format -t lucene -f kibana_ndjson
you can also use the following query postprocessing pipeline
to get the same output or use this as a starting point for your own customizations.
# lucene-kibana-ndjson.yml
postprocessing:
- type: template
template: |+
{"id": "{{ rule.id }}", "type": "search", "attributes": {"title": "SIGMA - {{ rule.title }}", "description": "{{ rule.description }}", "hits": 0, "columns": [], "sort": ["@timestamp", "desc"], "version": 1, "kibanaSavedObjectMeta": {"searchSourceJSON": "{\"index\": \"beats-*\", \"filter\": [], \"highlight\": {\"pre_tags\": [\"@kibana-highlighted-field@\"], \"post_tags\": [\"@/kibana-highlighted-field@\"], \"fields\": {\"*\": {}}, \"require_field_match\": false, \"fragment_size\": 2147483647}, \"query\": {\"query_string\": {\"query\": \"{{ query }}\", \"analyze_wildcard\": true}}}"}}, "references": [{"id": "beats-*", "name": "kibanaSavedObjectMeta.searchSourceJSON.index", "type": "index-pattern"}]}
Use this pipeline with: -t lucene -p lucene-kibana-ndjson.yml
but now without -f kibana_ndjson
.
Lucene Kibana SIEM Rule
Instead of using the format -t lucene -f siem_rule
you can also use the following query postprocessing pipeline
to get the same output or use this as a starting point for your own customizations.
# lucene-kibana-siemrule.yml
vars:
index_names:
- "apm-*-transaction*"
- "auditbeat-*"
- "endgame-*"
- "filebeat-*"
- "logs-*"
- "packetbeat-*"
- "traces-apm*"
- "winlogbeat-*"
- "-*elastic-cloud-logs-*"
schedule_interval: 5
schedule_interval_unit: m
postprocessing:
- type: template
template: |+
{
"name": "SIGMA - {{ rule.title }}",
"consumer": "siem",
"enabled": true,
"throttle": null,
"schedule": {
"interval": "{{ pipeline.vars.schedule_interval }}{{ pipeline.vars.schedule_interval_unit }}"
},
"params": {
"author": [
{% if rule.author is string -%}
"{{rule.author}}"
{% else %}
{% for a in rule.author -%}
"{{ a }}"{% if not loop.last %},{%endif%}
{% endfor -%}
{% endif -%}
],
"description": "{{ rule.description }}",
"ruleId": "{{ rule.id }}",
"falsePositives": {{ rule.falsepositives }},
"from": "now-{{ pipeline.vars.schedule_interval }}{{ pipeline.vars.schedule_interval_unit }}",
"immutable": false,
"license": "DRL",
"outputIndex": "",
"meta": {
"from": "1m"
},
"maxSignals": 100,
"riskScore": (
self.severity_risk_mapping[rule.level.name]
if rule.level is not None
else 21
),
"riskScoreMapping": [],
"severity": (
str(rule.level.name).lower() if rule.level is not None else "low"
),
"severityMapping": [],
"threat": list(self.finalize_output_threat_model(rule.tags)),
"to": "now",
"references": {{ rule.references |tojson(indent=6)}},
"version": 1,
"exceptionsList": [],
"relatedIntegrations": [],
"requiredFields": [],
"setup": "",
"type": "query",
"language": "lucene",
"index": {{ pipeline.vars.index_names | tojson(indent=6)}},
"query": "{{ query }}",
"filters": []
},
"rule_type_id": "siem.queryRule",
"tags": [
{% for n in rule.tags -%}
"{{ n.namespace }}-{{ n.name }}"{% if not loop.last %},{%endif%}
{% endfor -%}
],
"notify_when": "onActiveAlert",
"actions": []
}
Use this pipeline with: -t lucene -p lucene-kibana-siemrule.yml
but now without -f kibana_ndjson
.
ESQL siem_rule_ndjson
vars:
schedule_interval: 5
schedule_interval_unit: m
postprocessing:
- type: template
template: |+
{%- set tags = [] -%}
{% for n in rule.tags %}
{%- set tag_string = n.namespace ~ '-' ~ n.name -%}
{%- set tags=tags.append(tag_string) -%}
{% endfor %}
{%- set rule_data = {
"name": rule.title,
"id": rule.id | lower,
"author": [rule.author] if rule.author is string else rule.author,
"description": rule.description,
"references": rule.references,
"enabled": true,
"interval": pipeline.vars.schedule_interval|string ~ pipeline.vars.schedule_interval_unit,
"from": "now-" ~ pipeline.vars.schedule_interval|string ~ pipeline.vars.schedule_interval_unit,
"rule_id": rule.id | lower,
"false_positives": rule.falsepositives,
"immutable": false,
"output_index": "",
"meta": {
"from": "1m"
},
"risk_score": backend.severity_risk_mapping[rule.level.name] if rule.level is not none else 21,
"severity": rule.level.name | string | lower if rule.level is not none else "low",
"severity_mapping": [],
"threat": backend.finalize_output_threat_model(rule.tags) | list,
"to": "now",
"version": 1,
"max_signals": 100,
"exceptions_list": [],
"setup": "",
"type": "esql",
"note": "",
"license": "DRL",
"language": "esql",
"index": pipeline.vars.index_names | list,
"query": query,
"tags": tags,
"actions": [],
"related_integrations": [],
"required_fields": [],
"risk_score_mapping": []
}
-%}
{{ rule_data | tojson }}
Use this pipeline with: -t esql -p esql-siemrule-ndjson.yml
but now without -f siem_rule_ndjson
.
The output can be imported directly into Kibana as a Detection Rule.
Lucene siem_rule_ndjson
To be continued...
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file pysigma_backend_elasticsearch-1.1.4.tar.gz
.
File metadata
- Download URL: pysigma_backend_elasticsearch-1.1.4.tar.gz
- Upload date:
- Size: 29.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/5.1.1 CPython/3.12.7
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | df36774abbf4cad46cacbeb0340a474b066cd7990f6c7271e83d3ffc98557f98 |
|
MD5 | e49b6f21840873b0fe929661ecc09469 |
|
BLAKE2b-256 | 5154692fc27e1959318f1a58d1b83e2653bef1478319617b6b8ed319a7a62cb6 |
Provenance
The following attestation bundles were made for pysigma_backend_elasticsearch-1.1.4.tar.gz
:
Publisher:
release.yml
on SigmaHQ/pySigma-backend-elasticsearch
-
Statement type:
https://in-toto.io/Statement/v1
- Predicate type:
https://docs.pypi.org/attestations/publish/v1
- Subject name:
pysigma_backend_elasticsearch-1.1.4.tar.gz
- Subject digest:
df36774abbf4cad46cacbeb0340a474b066cd7990f6c7271e83d3ffc98557f98
- Sigstore transparency entry: 149089322
- Sigstore integration time:
- Predicate type:
File details
Details for the file pysigma_backend_elasticsearch-1.1.4-py3-none-any.whl
.
File metadata
- Download URL: pysigma_backend_elasticsearch-1.1.4-py3-none-any.whl
- Upload date:
- Size: 35.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/5.1.1 CPython/3.12.7
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 67ab7a15405bf53cd02a821b28cc43543da87bedd5ab375159b224d5a2723339 |
|
MD5 | a596b407bcc175b28f5e72a95c33a44f |
|
BLAKE2b-256 | 1c8c456c8b4497633575251d26af35c817e9fa5d826027d24300eaedcd60a60b |
Provenance
The following attestation bundles were made for pysigma_backend_elasticsearch-1.1.4-py3-none-any.whl
:
Publisher:
release.yml
on SigmaHQ/pySigma-backend-elasticsearch
-
Statement type:
https://in-toto.io/Statement/v1
- Predicate type:
https://docs.pypi.org/attestations/publish/v1
- Subject name:
pysigma_backend_elasticsearch-1.1.4-py3-none-any.whl
- Subject digest:
67ab7a15405bf53cd02a821b28cc43543da87bedd5ab375159b224d5a2723339
- Sigstore transparency entry: 149089325
- Sigstore integration time:
- Predicate type: