Skip to main content

pySigma Elasticsearch backend supporting Lucene, ES|QL (with correlations) and EQL queries

Project description

Tests Coverage Badge Status

pySigma Elasticsearch Backend

This is the Elasticsearch backend for pySigma. It provides the package sigma.backends.elasticsearch with the LuceneBackend class.

It supports the following output formats:

  • default: Lucene queries.
  • dsl_lucene: DSL with embedded Lucene queries.
  • eql: Elastic Event Query Language queries.
  • kibana_ndjson: Kibana NDJSON with Lucene queries.

Further, it contains the following processing pipelines in sigma.pipelines.elasticsearch:

  • ecs_windows in windows submodule: ECS mapping for Windows event logs ingested with Winlogbeat.
  • ecs_windows_old in windows submodule: ECS mapping for Windows event logs ingested with Winlogbeat <= 6.x.
  • ecs_zeek_beats in zeek submodule: Zeek ECS mapping from Elastic.
  • ecs_zeek_corelight in zeek submodule: Zeek ECS mapping from Corelight.
  • zeek_raw in zeek submodule: Zeek raw JSON log field naming.
  • ecs_kubernetes in kubernetes submodule: ECS mapping for Kubernetes audit logs ingested with Kubernetes integration

This backend is currently maintained by:

Further maintainers required! Send a message to Thomas if you want to co-maintain this backend.

Formats vs. Query Post Processing

While trying to support the minimum compatible output the built-in formats can't fits everyones needs. This gap is filled by a feature called "query post processing" available since pysigma v0.10.

For further information please read "Introducing Query Post-Processing and Output Finalization to Processing Pipelines".

Lucene Kibana NDJSON

Instead of using the format -t lucene -f kibana_ndjson you can also use the following query postprocessing pipeline to get the same output or use this as a starting point for your own customizations.

# lucene-kibana-ndjson.yml
postprocessing:
- type: template
  template: |+
    {"id": "{{ rule.id }}", "type": "search", "attributes": {"title": "SIGMA - {{ rule.title }}", "description": "{{ rule.description }}", "hits": 0, "columns": [], "sort": ["@timestamp", "desc"], "version": 1, "kibanaSavedObjectMeta": {"searchSourceJSON": "{\"index\": \"beats-*\", \"filter\": [], \"highlight\": {\"pre_tags\": [\"@kibana-highlighted-field@\"], \"post_tags\": [\"@/kibana-highlighted-field@\"], \"fields\": {\"*\": {}}, \"require_field_match\": false, \"fragment_size\": 2147483647}, \"query\": {\"query_string\": {\"query\": \"{{ query }}\", \"analyze_wildcard\": true}}}"}}, "references": [{"id": "beats-*", "name": "kibanaSavedObjectMeta.searchSourceJSON.index", "type": "index-pattern"}]}

Use this pipeline with: -t lucene -p lucene-kibana-ndjson.yml but now without -f kibana_ndjson.

Lucene Kibana SIEM Rule

Instead of using the format -t lucene -f siem_rule you can also use the following query postprocessing pipeline to get the same output or use this as a starting point for your own customizations.

# lucene-kibana-siemrule.yml
vars:
  index_names: 
    - "apm-*-transaction*"
    - "auditbeat-*"
    - "endgame-*"
    - "filebeat-*"
    - "logs-*"
    - "packetbeat-*"
    - "traces-apm*"
    - "winlogbeat-*"
    - "-*elastic-cloud-logs-*"
  schedule_interval: 5
  schedule_interval_unit: m
postprocessing:
- type: template
  template: |+
    {
      "name": "SIGMA - {{ rule.title }}",
      "consumer": "siem",
      "enabled": true,
      "throttle": null,
      "schedule": {
        "interval": "{{ pipeline.vars.schedule_interval }}{{ pipeline.vars.schedule_interval_unit }}"
      },
      "params": {
        "author": [
        {% if rule.author is string -%}
          "{{rule.author}}"
        {% else %}
        {% for a in rule.author -%}
          "{{ a }}"{% if not loop.last %},{%endif%}
        {% endfor -%}
        {% endif -%} 
        ],
        "description": "{{ rule.description }}",
        "ruleId": "{{ rule.id }}",
        "falsePositives": {{ rule.falsepositives }},
        "from": "now-{{ pipeline.vars.schedule_interval }}{{ pipeline.vars.schedule_interval_unit }}",
        "immutable": false,
        "license": "DRL",
        "outputIndex": "",
        "meta": {
          "from": "1m"
        },
        "maxSignals": 100,
        "riskScore": (
            self.severity_risk_mapping[rule.level.name]
            if rule.level is not None
            else 21
        ),
        "riskScoreMapping": [],
        "severity": (
            str(rule.level.name).lower() if rule.level is not None else "low"
        ),
        "severityMapping": [],
        "threat": list(self.finalize_output_threat_model(rule.tags)),
        "to": "now",
        "references": {{ rule.references |tojson(indent=6)}},
        "version": 1,
        "exceptionsList": [],
        "relatedIntegrations": [],
        "requiredFields": [],
        "setup": "",
        "type": "query",
        "language": "lucene",
        "index": {{ pipeline.vars.index_names | tojson(indent=6)}},
        "query": "{{ query }}",
        "filters": []
      },
      "rule_type_id": "siem.queryRule",
      "tags": [
        {% for n in rule.tags -%}
        "{{ n.namespace }}-{{ n.name }}"{% if not loop.last %},{%endif%}
      {% endfor -%}
      ],
      "notify_when": "onActiveAlert",
      "actions": []
    }

Use this pipeline with: -t lucene -p lucene-kibana-siemrule.yml but now without -f kibana_ndjson.

EQL siem_rule_ndjson

vars:
  schedule_interval: 5
  schedule_interval_unit: m
postprocessing:
  - type: template
    template: |+
      {%- set tags = [] -%}
      {% for n in rule.tags %}
        {%- set tag_string = n.namespace ~ '-' ~ n.name -%}
        {%- set tags=tags.append(tag_string) -%}
      {% endfor %}

      {%- set rule_data = {
        "name": rule.title,
        "id": rule.id | lower,
        "author": [rule.author] if rule.author is string else rule.author or "",
        "description": rule.description if rule.description else "empty description",
        "references": rule.references,
        "enabled": true,
        "interval": pipeline.vars.schedule_interval|string ~ pipeline.vars.schedule_interval_unit,
        "from": "now-" ~ pipeline.vars.schedule_interval|string ~ pipeline.vars.schedule_interval_unit,
        "rule_id": rule.id | lower,
        "false_positives": rule.falsepositives,
        "immutable": false,
        "output_index": "",
        "meta": {
          "from": "1m"
        },
        "risk_score": rule.custom_attributes.risk_score | default(21),
        "severity": rule.level.name | string | lower if rule.level is not none else 'low',
        "threat": rule.custom_attributes.threat | default([]),
        "severity_mapping": [],
        "to": "now",
        "version": 1,
        "max_signals": 100,
        "exceptions_list": [],
        "setup": "",
        "type": "eql",
        "note": "",
        "license": "DRL",
        "language": "eql",
        "query": query,
        "tags": tags,
        "index": pipeline.state.index,
        "actions": [],
        "related_integrations": [],
        "required_fields": [],
        "risk_score_mapping": []
      }
      -%}
      
      {{ rule_data | tojson }}

Use this pipeline with: -t eql -p eql-siemrule-ndjson.yml but now without -f siem_rule_ndjson. The output can be imported directly into Kibana as a Detection Rule.

ESQL siem_rule_ndjson

vars:
  schedule_interval: 5
  schedule_interval_unit: m
postprocessing:
  - type: template
    template: |+
      {%- set tags = [] -%}
      {% for n in rule.tags %}
        {%- set tag_string = n.namespace ~ '-' ~ n.name -%}
        {%- set tags=tags.append(tag_string) -%}
      {% endfor %}
      {%- set rule_data = {
        "name": rule.title,
        "id": rule.id | lower,
        "author": [rule.author] if rule.author is string else rule.author,
        "description": rule.description,
        "references": rule.references,
        "enabled": true,
        "interval": pipeline.vars.schedule_interval|string ~ pipeline.vars.schedule_interval_unit,
        "from": "now-" ~ pipeline.vars.schedule_interval|string ~ pipeline.vars.schedule_interval_unit,
        "rule_id": rule.id | lower,
        "false_positives": rule.falsepositives,
        "immutable": false,
        "output_index": "",
        "meta": {
          "from": "1m"
        },
        "risk_score": backend.severity_risk_mapping[rule.level.name] if rule.level is not none else 21, 
        "severity": rule.level.name | string | lower if rule.level is not none else "low",
        "severity_mapping": [],
        "threat": backend.finalize_output_threat_model(rule.tags) | list,
        "to": "now",
        "version": 1,
        "max_signals": 100,
        "exceptions_list": [],
        "setup": "",
        "type": "esql",
        "note": "",
        "license": "DRL",
        "language": "esql",
        "index": pipeline.vars.index_names | list,
      "query": query,
      "tags": tags,
      "actions": [],
      "related_integrations": [],
      "required_fields": [],
      "risk_score_mapping": []
      }
      -%}
      
      {{ rule_data | tojson }}

Use this pipeline with: -t esql -p esql-siemrule-ndjson.yml but now without -f siem_rule_ndjson. The output can be imported directly into Kibana as a Detection Rule.

Lucene siem_rule_ndjson

To be continued...

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pysigma_backend_elasticsearch-2.0.0.tar.gz (29.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pysigma_backend_elasticsearch-2.0.0-py3-none-any.whl (36.9 kB view details)

Uploaded Python 3

File details

Details for the file pysigma_backend_elasticsearch-2.0.0.tar.gz.

File metadata

File hashes

Hashes for pysigma_backend_elasticsearch-2.0.0.tar.gz
Algorithm Hash digest
SHA256 9e4fd94bf4b2e8b8cfbeafae7577ecbc98b1ffd0701511beeab6494379fd7b1a
MD5 0483ce7468c17bff87521ecd32859360
BLAKE2b-256 2deaf0ffe339911e4082ede9356d9423517f9b5c6374acd65348f66259a3d0d4

See more details on using hashes here.

Provenance

The following attestation bundles were made for pysigma_backend_elasticsearch-2.0.0.tar.gz:

Publisher: release.yml on SigmaHQ/pySigma-backend-elasticsearch

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file pysigma_backend_elasticsearch-2.0.0-py3-none-any.whl.

File metadata

File hashes

Hashes for pysigma_backend_elasticsearch-2.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 73fa55ebda6d70dbcd8ff5cb412eac526b9cad07c9d8682aa73595b88e5d26c1
MD5 7fa2ee1266caf24f4245341d9a2adc17
BLAKE2b-256 01f6bda893f233f286bb1b38bdfadc4eb94e92d46b555f7036fc194e33aabd70

See more details on using hashes here.

Provenance

The following attestation bundles were made for pysigma_backend_elasticsearch-2.0.0-py3-none-any.whl:

Publisher: release.yml on SigmaHQ/pySigma-backend-elasticsearch

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page