Skip to main content

No project description provided

Project description

Panther SDK

The Panther SDK module allows you to configure detections for your Panther instance.

Getting Started

Install

The Panther SDK can be installed using PIP.

pip3 install panther-sdk==0.0.24

Writing a detection

You can use the detection module in Panther SDK to configure detections. The example below shows a detection that processing an HTTP log.

Assuming we have a log type Internal.HTTP.Traffic that has the following shape:

{
    "method": "POST",
    "useSSL": false,
    "path": "/api/some/endpoint",
    "host": "internal.megacorp.com"
}

We can write a detection to check for insecure POST and PUT requests:

# import the "detection" module, as we'll be creating a rule.
from panther_sdk import detection

# The next four functions are used in the detection definition that follows. Panther 
# detections allow for parts of a detection to be defined with custom Python code.


# "filter_insecure" is an example of python function that will be used to create
# a detection.PythonFilter that only matches insecure events. The event argument will
# have a python dict object corresponding to the JSON event.
def filter_insecure(event, params):
    return event["useSSL"] == False


# "filter_http_methods" is a slightly more complicated python function that we will use
# to create a detection.PythonFilter later. This function uses the second "params" argument
# to make the function's logic reusable.
def filter_http_methods(event, params):
    allowed_methods = params["methods"]
    actual_method = event["method"]
    return actual_method in allowed_methods


# "reference_generator" is another example where we will use some python code to dynamically
# change the information Panther presents on an Alert resulting from a Detection match. In
# this example, we're populating a query param with some relevant information from 
# the contextual event.
def reference_generator(event):
    origin = event["origin"]
    return f"https://wiki.internal.megacorp.com/hosts_db?host={origin}"


# "make_context" will be used to attach arbitrary data to the resulting Alert. This
# can be used to simply change how data is presented or be used to push machine readable information
# to a downstream Alert Destination.
def make_context(event):
    # The Panther SDK's hooks to arbitrary python come with some restrictions. When the
    # detection is saved, the Panther backend will capture the source of provided function to use
    # in realtime log data processing and other backend processes. Because of this, functions used
    # to define a detection must not have references that are not local to the function body. Therefore,
    # imports targeting any of the below _must_ be included in the function body:
    #
    #   - The python standard library
    #   - Libraries that are enabled on your Panther instance
    #   - Panther "Global Helpers" that you have configured on your instance
    
    import fnmatch 
    
    path = event["path"]
    return {
        "is_api_path": fnmatch.fnmatch(path, "/api/*")
    }


# "pick_severity" is a custom function we are using to define the severity of Alerts based on
# the data present in the current log. The detection.Rule declaration below shows how this method
# is registered with the definition of our Rule.
def pick_severity(event) -> str:
    from panther_sdk import detection # required. see notes in comment above "make_context"
    
    if event["origin"] != "internal.megacorp.com":
        return detection.SeverityInfo

    return detection.SeverityInfo


# Declare a rule. Every call to "detection.Rule" in your repo will create a Rule in the Panther backend.
# This example uses a subset of the fields available to define a Rule. See "Full Dataclass API" for
# all the available fields.
detection.Rule(
    # Give the Rule an id. This name must be unique on your Panther 
    # instance. Dot separated names following "namespace" pattern is recommended. 
    rule_id="Internal.HTTP.Traffic.Insecure",
    
    # Pick a human friendly name for the rule. This will be used
    # to present the Rule in the Panther Console UI
    name="Detect insecure internal HTTP traffic",
    
    # Specify one or more log types for your Rule. This setting is what
    # tells the Panther backend to run this Rule against the log data depicted above.
    log_types=["Internal.HTTP.Traffic"],
    
    # Specify an enabled state for the Rule. Detections can be uploaded in a
    # disabled state and will not begin processing log data until enabled.
    enabled=True,
    
    # Optionally, you can specify a list of tags to associate with the detection.
    tags=["internal"],

    # The "filters" list defines the sequence of matching logic that an event
    # will be tested against to determine whether there is a match. Returning "True" proceeds to the
    # next check. If the final check returns "True", there is a match.
    filters=[
        # If the HTTP transaction was secure, we're not interested in that log in this detection.
        # Therefore, the first filter uses the "filter_insecure" function we defined to filter to only
        # events that have useSSL set to "false". 
        detection.PythonFilter(func=filter_insecure),
        
        # For now, we're only interested in getting alerted for insecure POST and PUT requests.
        # We therefore define the second filter step using "filter_http_methods". Unlike "filter_insecure",
        # we pass some parameters to this function. 
        detection.PythonFilter(func=filter_http_methods, params={'methods': ["POST", "PUT"]}),
    ],

    # On this line, we're defining the severity that should be associated with any resulting alerts.
    # We have the option of defining a static severity (simply: severity="INFO") but, in this case, we want
    # to make the severity dynamic based on data from the event. To do this, we reference the "pick_severity"
    # function we defined at the top of the file. We also provide a fallback value in case the dynamic function
    # cannot be processed. This fallback will also be the Severity value displayed in the Panther Console UI
    severity=detection.DynamicStringField(
        func=pick_severity,
        fallback=detection.SeverityInfo,
    ),

    # Below we specify the "reference" field. This optional field is used to attach a link to any Alerts 
    # resulting from this detection. Similar to "severity" we can make this field dynamic based on 
    # event data. There is also a fallback value specified.
    reference=detection.DynamicStringField(
        func=reference_generator,
        fallback="https://wiki.internal.megacorp.com/hosts_db",
    ),
    
    # Fields like "runbook" and "description" are also available to further customize how any 
    # resulting Alerts will be displayed. These can have dynamic handlers defined, but in this 
    # example static string values will work just fine.
    runbook="Optional runbook content",
    description="A helpful description",
    
    # Finally, we use the optional "alert_context" field to tell the Panther backend to use our 
    # "make_context" function to generate custom data that will be attached to any resulting Alerts.
    alert_context=make_context,
)

There is support for override behavior that can be very helpful for writing factory functions for your detections. You can provide defaults for your detection while letting customizations be set when the function is called. For example:

from panther_sdk import detection

# This is an example of a factory rule function that might live in panther_detections or your own library of detections.
# This function accepts an optional overrides object, which is passed through to the Rule instantiation. Any attributes
# set on the RuleOverrides object will override the attributes defined for the rule.
def factory_rule_func(overrides: Optional[detection.RuleOverrides] = None) -> detection.Rule:
    return detection.Rule(
        rule_id="default.rule.id",
        filters=[...],
        log_types=["AWS.ALB"],
        severity=detection.SeverityMedium,
        overrides=overrides,
    )


# Here is an example of using the rule factory function.
def make_detections() -> None:
    # We can pass no overrides at all, and it will be the default rule the function creates
    r = factory_rule_func()
    print(r)

    # We can override the id and severity to create a more customized rule using our function from above
    r = factory_rule_func(overrides=detection.RuleOverrides(
        rule_id="not.important.rule",
        severity=detection.SeverityInfo,
    ))
    print(r)

    r = factory_rule_func(overrides=detection.RuleOverrides(
        rule_id="very.important.rule",
        severity=detection.SeverityHigh,
    ))
    print(r)


# Now we call our main detection making function to execute it when running this python script
make_detections()

Similar to overrides, you have the ability to add extensions to your Panther content. These can be used the same way as overrides on fields that have lists.

from panther_sdk import detection

# This is an example of a factory rule function that might live in panther_detections or your own library of detections.
# This function accepts an optional extensions object, which is passed through to the Rule instantiation. Any attributes
# set on the RuleExtensions object will override the attributes defined for the rule.
def factory_rule_func(extensions: Optional[detection.RuleExtensions] = None) -> detection.Rule:
    return detection.Rule(
        rule_id="default.rule.id",
        filters=[...],
        log_types=["AWS.ALB"],
        severity=detection.SeverityMedium,
        extensions=extensions,
    )


# Here is an example of using the rule factory function.
def make_detections() -> None:
    # We can pass no extensions at all, and it will be the default rule the function creates
    r = factory_rule_func()
    print(r)

    # We can extend filters to add logic to the rule
    r = factory_rule_func(extensions=detection.RuleExtensions(
        filters=[detection.PythonFilter(func=my_very_important_filter)],
    ))
    print(r)


# Now we call our main detection making function to execute it when running this python script
make_detections()

Full Dataclass API

detection module

DynamicStringField

Make a field dynamic based on the detection input

Field Description Type
fallback Fallback value in case the dynamic handler fails str
func Dynamic handler Optional[Callable[[PantherEvent], str]]

DynamicDestinations

Make destinations dynamic based on the detection input

Field Description Type
fallback Fallback value in case the dynamic handler fails Optional[List[str]]
func Dynamic handler Callable[[PantherEvent], List[str]]

AlertGrouping

Configuration for how an alert is grouped

Field Description Type
period_minutes How long should matches be grouped into an alert after the first match int
group_by Function to generate a key for grouping matches Optional[Callable[[PantherEvent], str]]

PythonFilter

Create a filter by referencing a python function

Field Description Type
func Provide a function whose python source will be used as the filter definition Callable[[PantherEvent], bool]

UnitTestMock

Mock for a unit test

Field Description Type
name name of the object to mock str
return_value string to assign as the return value for the mock str

JSONUnitTest

Unit test with json content

Field Description Type
name name of the unit test str
expect_match whether the data should match and trigger an alert bool
data json string str
mocks list of mocks to use in the test Optional[List[UnitTestMock]]

Policy

Define a Policy-type detection to execute against log data in your Panther instance

Field Description Type
policy_id ID for the Policy str
ignore_patterns Patterns of resource ids to ignore for the policy Optional[Union[str, List[str]]]
destinations Alert destinations for the policy Optional[Union[str, List[str], DynamicDestinations]]
filters Define event filters for the policy Union[Union[PythonFilter], List[Union[PythonFilter]]]
enabled Whether the policy is enabled or not bool
resource_types What resource types this policy will apply to Union[str, List[str]]
severity What severity alerts generated from this policy get assigned Union[str, DynamicStringField]
name What name to display in the UI and alerts. The PolicyID will be displayed if this field is not set. Optional[str]
description Description for the policy Optional[Union[str, DynamicStringField]]
reference The reason this policy exists, often a link to documentation Optional[Union[str, DynamicStringField]]
reports A mapping of framework or report names to values this policy covers for that framework Optional[Dict[str, List[str]]]
runbook The actions to be carried out if this policy fails, often a link to documentation Optional[Union[str, DynamicStringField]]
tags Tags used to categorize this policy Optional[Union[str, List[str]]]
unit_tests Unit tests for this policy Optional[Union[Union[JSONUnitTest], List[Union[JSONUnitTest]]]]
alert_title Title to use in the alert Optional[Callable[[PantherEvent], str]]
alert_context Optional JSON to attach to alerts generated by this policy Optional[Callable[[PantherEvent], Dict[str, Any]]]
alert_grouping Configuration for how an alert is grouped Optional[AlertGrouping]

Rule

Define a Rule-type detection to execute against log data in your Panther instance

Field Description Type
rule_id ID for the rule str
severity Severity for the rule Union[str, DynamicStringField]
threshold Number of matches received before an alert is triggered int
name Display name for the rule Optional[str]
log_types Log Types to associate with this rule Union[str, List[str]]
filters Define event filters for the rule Union[Union[PythonFilter], List[Union[PythonFilter]]]
enabled Whether the rule is enabled or not bool
unit_tests Define event filters for the rule Optional[Union[Union[JSONUnitTest], List[Union[JSONUnitTest]]]]
tags Tags for the rule Optional[Union[str, List[str]]]
reference Reference for the rule Optional[Union[str, DynamicStringField]]
runbook Runbook for the rule Optional[Union[str, DynamicStringField]]
description Description for the rule Optional[Union[str, DynamicStringField]]
summary_attrs Summary Attributes for the rule Optional[List[str]]
reports Report mappings for the rule Optional[Dict[str, List[str]]]
destinations Alert destinations for the rule Optional[Union[str, List[str], DynamicDestinations]]
alert_title Title to use in the alert Optional[Callable[[PantherEvent], str]]
alert_context Optional JSON to attach to alerts generated by this rule Optional[Callable[[PantherEvent], Dict[str, Any]]]
alert_grouping Configuration for how an alert is grouped Optional[AlertGrouping]

ScheduledRule

Define a ScheduledRule-type detection to execute against query results in your Panther instance

Field Description Type
rule_id ID for the scheduled rule str
severity What severity alerts generated from this scheduled rule get assigned Union[str, DynamicStringField]
threshold Number of matches received before an alert is triggered int
name Display name for the scheduled rule Optional[str]
scheduled_queries Scheduled queries to associate with this scheduled rule Union[str, List[str]]
filters Define event filters for the scheduled rule Union[Union[PythonFilter], List[Union[PythonFilter]]]
enabled Short description for the scheduled rule bool
unit_tests Define event filters for the scheduled rule Optional[Union[Union[JSONUnitTest], List[Union[JSONUnitTest]]]]
tags Tags for the scheduled rule Optional[Union[str, List[str]]]
reference Reference for the scheduled rule Optional[Union[str, DynamicStringField]]
runbook Runbook for the scheduled rule Optional[Union[str, DynamicStringField]]
description Description for the scheduled rule Optional[Union[str, DynamicStringField]]
summary_attrs Summary Attributes for the scheduled rule Optional[List[str]]
reports Report mappings for the scheduled rule Optional[Dict[str, List[str]]]
destinations Alert destinations for the scheduled rule Optional[Union[str, List[str], DynamicDestinations]]
alert_title Title to use in the alert Optional[Callable[[PantherEvent], str]]
alert_context Optional JSON to attach to alerts generated by this rule Optional[Callable[[PantherEvent], Dict[str, Any]]]
alert_grouping Configuration for how an alert is grouped Optional[AlertGrouping]

query module

CronSchedule

Cron expression based schedule definition for a query

Field Description Type
expression Defines how often queries using this schedule run str
timeout_minutes Defines the timeout applied to queries with this schedule int

IntervalSchedule

Interval based schedule definition for a query

Field Description Type
rate_minutes Defines how often queries using this schedule run int
timeout_minutes Defines the timeout applied to queries with this schedule int

Query

A saved or scheduled query

Field Description Type
name Unique name for the query str
description Short description for the query str
default_database Default database for the query str
sql SQL statement str
enabled Whether the query is enabled or not bool
tags Tags for the query Optional[Union[str, List[str]]]
schedule Schedule attached to the query Optional[Union[IntervalSchedule, CronSchedule]]

schema module

DataModelMapping

Field Description Type
name Name of the data model field. This will be the name used when accessing the field from within detections. str
path Path to the target value in the Panther Event. This can be a simple field name or complete JSON path starting with a $. JSON path syntax must be compatible with the jsonpath-ng Python package. Optional[str]
func A Python function to access the target value. The input is the Panther Event and output is the target value in the Panther Event. Optional[Callable[[PantherEvent], Any]]

DataModel

Data Models provide a way to configure a set of unified fields across all log types.

Field Description Type
data_model_id The unique identifier of the data model. str
name What name to display in the UI and alerts. The data_model_id will be displayed if this field is not set. Optional[str]
enabled Whether this data model is enabled. Optional[bool]
log_type What log type this data model will apply to. str
mappings Mapping from source field name or method to unified data model field name. Union[DataModelMapping, List[DataModelMapping]]

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

panther_sdk-0.0.24.tar.gz (34.9 kB view details)

Uploaded Source

File details

Details for the file panther_sdk-0.0.24.tar.gz.

File metadata

  • Download URL: panther_sdk-0.0.24.tar.gz
  • Upload date:
  • Size: 34.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.9.15

File hashes

Hashes for panther_sdk-0.0.24.tar.gz
Algorithm Hash digest
SHA256 30fb1c9d4931fb741cffaed23f611943288e26de82daa15cf8f27e41124008af
MD5 39e7ef24efa05be600fe3d61fabf8442
BLAKE2b-256 508269b068c129049f52c872fd55d4004c4229b48d5e821ca986d355c8b1f727

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page