No project description provided
Project description
Panther SDK
The Panther SDK module allows you to configure detections for your Panther instance.
Getting Started
Install
The Panther SDK can be installed using PIP.
pip3 install panther-sdk==0.0.24
Writing a detection
You can use the detection
module in Panther SDK to configure detections. The example below shows a detection that processing an HTTP log.
Assuming we have a log type Internal.HTTP.Traffic
that has the following shape:
{
"method": "POST",
"useSSL": false,
"path": "/api/some/endpoint",
"host": "internal.megacorp.com"
}
We can write a detection to check for insecure POST
and PUT
requests:
# import the "detection" module, as we'll be creating a rule.
from panther_sdk import detection
# The next four functions are used in the detection definition that follows. Panther
# detections allow for parts of a detection to be defined with custom Python code.
# "filter_insecure" is an example of python function that will be used to create
# a detection.PythonFilter that only matches insecure events. The event argument will
# have a python dict object corresponding to the JSON event.
def filter_insecure(event, params):
return event["useSSL"] == False
# "filter_http_methods" is a slightly more complicated python function that we will use
# to create a detection.PythonFilter later. This function uses the second "params" argument
# to make the function's logic reusable.
def filter_http_methods(event, params):
allowed_methods = params["methods"]
actual_method = event["method"]
return actual_method in allowed_methods
# "reference_generator" is another example where we will use some python code to dynamically
# change the information Panther presents on an Alert resulting from a Detection match. In
# this example, we're populating a query param with some relevant information from
# the contextual event.
def reference_generator(event):
origin = event["origin"]
return f"https://wiki.internal.megacorp.com/hosts_db?host={origin}"
# "make_context" will be used to attach arbitrary data to the resulting Alert. This
# can be used to simply change how data is presented or be used to push machine readable information
# to a downstream Alert Destination.
def make_context(event):
# The Panther SDK's hooks to arbitrary python come with some restrictions. When the
# detection is saved, the Panther backend will capture the source of provided function to use
# in realtime log data processing and other backend processes. Because of this, functions used
# to define a detection must not have references that are not local to the function body. Therefore,
# imports targeting any of the below _must_ be included in the function body:
#
# - The python standard library
# - Libraries that are enabled on your Panther instance
# - Panther "Global Helpers" that you have configured on your instance
import fnmatch
path = event["path"]
return {
"is_api_path": fnmatch.fnmatch(path, "/api/*")
}
# "pick_severity" is a custom function we are using to define the severity of Alerts based on
# the data present in the current log. The detection.Rule declaration below shows how this method
# is registered with the definition of our Rule.
def pick_severity(event) -> str:
from panther_sdk import detection # required. see notes in comment above "make_context"
if event["origin"] != "internal.megacorp.com":
return detection.SeverityInfo
return detection.SeverityInfo
# Declare a rule. Every call to "detection.Rule" in your repo will create a Rule in the Panther backend.
# This example uses a subset of the fields available to define a Rule. See "Full Dataclass API" for
# all the available fields.
detection.Rule(
# Give the Rule an id. This name must be unique on your Panther
# instance. Dot separated names following "namespace" pattern is recommended.
rule_id="Internal.HTTP.Traffic.Insecure",
# Pick a human friendly name for the rule. This will be used
# to present the Rule in the Panther Console UI
name="Detect insecure internal HTTP traffic",
# Specify one or more log types for your Rule. This setting is what
# tells the Panther backend to run this Rule against the log data depicted above.
log_types=["Internal.HTTP.Traffic"],
# Specify an enabled state for the Rule. Detections can be uploaded in a
# disabled state and will not begin processing log data until enabled.
enabled=True,
# Optionally, you can specify a list of tags to associate with the detection.
tags=["internal"],
# The "filters" list defines the sequence of matching logic that an event
# will be tested against to determine whether there is a match. Returning "True" proceeds to the
# next check. If the final check returns "True", there is a match.
filters=[
# If the HTTP transaction was secure, we're not interested in that log in this detection.
# Therefore, the first filter uses the "filter_insecure" function we defined to filter to only
# events that have useSSL set to "false".
detection.PythonFilter(func=filter_insecure),
# For now, we're only interested in getting alerted for insecure POST and PUT requests.
# We therefore define the second filter step using "filter_http_methods". Unlike "filter_insecure",
# we pass some parameters to this function.
detection.PythonFilter(func=filter_http_methods, params={'methods': ["POST", "PUT"]}),
],
# On this line, we're defining the severity that should be associated with any resulting alerts.
# We have the option of defining a static severity (simply: severity="INFO") but, in this case, we want
# to make the severity dynamic based on data from the event. To do this, we reference the "pick_severity"
# function we defined at the top of the file. We also provide a fallback value in case the dynamic function
# cannot be processed. This fallback will also be the Severity value displayed in the Panther Console UI
severity=detection.DynamicStringField(
func=pick_severity,
fallback=detection.SeverityInfo,
),
# Below we specify the "reference" field. This optional field is used to attach a link to any Alerts
# resulting from this detection. Similar to "severity" we can make this field dynamic based on
# event data. There is also a fallback value specified.
reference=detection.DynamicStringField(
func=reference_generator,
fallback="https://wiki.internal.megacorp.com/hosts_db",
),
# Fields like "runbook" and "description" are also available to further customize how any
# resulting Alerts will be displayed. These can have dynamic handlers defined, but in this
# example static string values will work just fine.
runbook="Optional runbook content",
description="A helpful description",
# Finally, we use the optional "alert_context" field to tell the Panther backend to use our
# "make_context" function to generate custom data that will be attached to any resulting Alerts.
alert_context=make_context,
)
There is support for override behavior that can be very helpful for writing factory functions for your detections. You can provide defaults for your detection while letting customizations be set when the function is called. For example:
from panther_sdk import detection
# This is an example of a factory rule function that might live in panther_detections or your own library of detections.
# This function accepts an optional overrides object, which is passed through to the Rule instantiation. Any attributes
# set on the RuleOverrides object will override the attributes defined for the rule.
def factory_rule_func(overrides: Optional[detection.RuleOverrides] = None) -> detection.Rule:
return detection.Rule(
rule_id="default.rule.id",
filters=[...],
log_types=["AWS.ALB"],
severity=detection.SeverityMedium,
overrides=overrides,
)
# Here is an example of using the rule factory function.
def make_detections() -> None:
# We can pass no overrides at all, and it will be the default rule the function creates
r = factory_rule_func()
print(r)
# We can override the id and severity to create a more customized rule using our function from above
r = factory_rule_func(overrides=detection.RuleOverrides(
rule_id="not.important.rule",
severity=detection.SeverityInfo,
))
print(r)
r = factory_rule_func(overrides=detection.RuleOverrides(
rule_id="very.important.rule",
severity=detection.SeverityHigh,
))
print(r)
# Now we call our main detection making function to execute it when running this python script
make_detections()
Similar to overrides, you have the ability to add extensions to your Panther content. These can be used the same way as overrides on fields that have lists.
from panther_sdk import detection
# This is an example of a factory rule function that might live in panther_detections or your own library of detections.
# This function accepts an optional extensions object, which is passed through to the Rule instantiation. Any attributes
# set on the RuleExtensions object will override the attributes defined for the rule.
def factory_rule_func(extensions: Optional[detection.RuleExtensions] = None) -> detection.Rule:
return detection.Rule(
rule_id="default.rule.id",
filters=[...],
log_types=["AWS.ALB"],
severity=detection.SeverityMedium,
extensions=extensions,
)
# Here is an example of using the rule factory function.
def make_detections() -> None:
# We can pass no extensions at all, and it will be the default rule the function creates
r = factory_rule_func()
print(r)
# We can extend filters to add logic to the rule
r = factory_rule_func(extensions=detection.RuleExtensions(
filters=[detection.PythonFilter(func=my_very_important_filter)],
))
print(r)
# Now we call our main detection making function to execute it when running this python script
make_detections()
Full Dataclass API
detection module
DynamicStringField
Make a field dynamic based on the detection input
Field | Description | Type |
---|---|---|
fallback |
Fallback value in case the dynamic handler fails | str |
func |
Dynamic handler | Optional[Callable[[PantherEvent], str]] |
DynamicDestinations
Make destinations dynamic based on the detection input
Field | Description | Type |
---|---|---|
fallback |
Fallback value in case the dynamic handler fails | Optional[List[str]] |
func |
Dynamic handler | Callable[[PantherEvent], List[str]] |
AlertGrouping
Configuration for how an alert is grouped
Field | Description | Type |
---|---|---|
period_minutes |
How long should matches be grouped into an alert after the first match | int |
group_by |
Function to generate a key for grouping matches | Optional[Callable[[PantherEvent], str]] |
PythonFilter
Create a filter by referencing a python function
Field | Description | Type |
---|---|---|
func |
Provide a function whose python source will be used as the filter definition | Callable[[PantherEvent], bool] |
UnitTestMock
Mock for a unit test
Field | Description | Type |
---|---|---|
name |
name of the object to mock | str |
return_value |
string to assign as the return value for the mock | str |
JSONUnitTest
Unit test with json content
Field | Description | Type |
---|---|---|
name |
name of the unit test | str |
expect_match |
whether the data should match and trigger an alert | bool |
data |
json string | str |
mocks |
list of mocks to use in the test | Optional[List[UnitTestMock]] |
Policy
Define a Policy-type detection to execute against log data in your Panther instance
Field | Description | Type |
---|---|---|
policy_id |
ID for the Policy | str |
ignore_patterns |
Patterns of resource ids to ignore for the policy | Optional[Union[str, List[str]]] |
destinations |
Alert destinations for the policy | Optional[Union[str, List[str], DynamicDestinations]] |
filters |
Define event filters for the policy | Union[Union[PythonFilter], List[Union[PythonFilter]]] |
enabled |
Whether the policy is enabled or not | bool |
resource_types |
What resource types this policy will apply to | Union[str, List[str]] |
severity |
What severity alerts generated from this policy get assigned | Union[str, DynamicStringField] |
name |
What name to display in the UI and alerts. The PolicyID will be displayed if this field is not set. | Optional[str] |
description |
Description for the policy | Optional[Union[str, DynamicStringField]] |
reference |
The reason this policy exists, often a link to documentation | Optional[Union[str, DynamicStringField]] |
reports |
A mapping of framework or report names to values this policy covers for that framework | Optional[Dict[str, List[str]]] |
runbook |
The actions to be carried out if this policy fails, often a link to documentation | Optional[Union[str, DynamicStringField]] |
tags |
Tags used to categorize this policy | Optional[Union[str, List[str]]] |
unit_tests |
Unit tests for this policy | Optional[Union[Union[JSONUnitTest], List[Union[JSONUnitTest]]]] |
alert_title |
Title to use in the alert | Optional[Callable[[PantherEvent], str]] |
alert_context |
Optional JSON to attach to alerts generated by this policy | Optional[Callable[[PantherEvent], Dict[str, Any]]] |
alert_grouping |
Configuration for how an alert is grouped | Optional[AlertGrouping] |
Rule
Define a Rule-type detection to execute against log data in your Panther instance
Field | Description | Type |
---|---|---|
rule_id |
ID for the rule | str |
severity |
Severity for the rule | Union[str, DynamicStringField] |
threshold |
Number of matches received before an alert is triggered | int |
name |
Display name for the rule | Optional[str] |
log_types |
Log Types to associate with this rule | Union[str, List[str]] |
filters |
Define event filters for the rule | Union[Union[PythonFilter], List[Union[PythonFilter]]] |
enabled |
Whether the rule is enabled or not | bool |
unit_tests |
Define event filters for the rule | Optional[Union[Union[JSONUnitTest], List[Union[JSONUnitTest]]]] |
tags |
Tags for the rule | Optional[Union[str, List[str]]] |
reference |
Reference for the rule | Optional[Union[str, DynamicStringField]] |
runbook |
Runbook for the rule | Optional[Union[str, DynamicStringField]] |
description |
Description for the rule | Optional[Union[str, DynamicStringField]] |
summary_attrs |
Summary Attributes for the rule | Optional[List[str]] |
reports |
Report mappings for the rule | Optional[Dict[str, List[str]]] |
destinations |
Alert destinations for the rule | Optional[Union[str, List[str], DynamicDestinations]] |
alert_title |
Title to use in the alert | Optional[Callable[[PantherEvent], str]] |
alert_context |
Optional JSON to attach to alerts generated by this rule | Optional[Callable[[PantherEvent], Dict[str, Any]]] |
alert_grouping |
Configuration for how an alert is grouped | Optional[AlertGrouping] |
ScheduledRule
Define a ScheduledRule-type detection to execute against query results in your Panther instance
Field | Description | Type |
---|---|---|
rule_id |
ID for the scheduled rule | str |
severity |
What severity alerts generated from this scheduled rule get assigned | Union[str, DynamicStringField] |
threshold |
Number of matches received before an alert is triggered | int |
name |
Display name for the scheduled rule | Optional[str] |
scheduled_queries |
Scheduled queries to associate with this scheduled rule | Union[str, List[str]] |
filters |
Define event filters for the scheduled rule | Union[Union[PythonFilter], List[Union[PythonFilter]]] |
enabled |
Short description for the scheduled rule | bool |
unit_tests |
Define event filters for the scheduled rule | Optional[Union[Union[JSONUnitTest], List[Union[JSONUnitTest]]]] |
tags |
Tags for the scheduled rule | Optional[Union[str, List[str]]] |
reference |
Reference for the scheduled rule | Optional[Union[str, DynamicStringField]] |
runbook |
Runbook for the scheduled rule | Optional[Union[str, DynamicStringField]] |
description |
Description for the scheduled rule | Optional[Union[str, DynamicStringField]] |
summary_attrs |
Summary Attributes for the scheduled rule | Optional[List[str]] |
reports |
Report mappings for the scheduled rule | Optional[Dict[str, List[str]]] |
destinations |
Alert destinations for the scheduled rule | Optional[Union[str, List[str], DynamicDestinations]] |
alert_title |
Title to use in the alert | Optional[Callable[[PantherEvent], str]] |
alert_context |
Optional JSON to attach to alerts generated by this rule | Optional[Callable[[PantherEvent], Dict[str, Any]]] |
alert_grouping |
Configuration for how an alert is grouped | Optional[AlertGrouping] |
query module
CronSchedule
Cron expression based schedule definition for a query
Field | Description | Type |
---|---|---|
expression |
Defines how often queries using this schedule run | str |
timeout_minutes |
Defines the timeout applied to queries with this schedule | int |
IntervalSchedule
Interval based schedule definition for a query
Field | Description | Type |
---|---|---|
rate_minutes |
Defines how often queries using this schedule run | int |
timeout_minutes |
Defines the timeout applied to queries with this schedule | int |
Query
A saved or scheduled query
Field | Description | Type |
---|---|---|
name |
Unique name for the query | str |
description |
Short description for the query | str |
default_database |
Default database for the query | str |
sql |
SQL statement | str |
enabled |
Whether the query is enabled or not | bool |
tags |
Tags for the query | Optional[Union[str, List[str]]] |
schedule |
Schedule attached to the query | Optional[Union[IntervalSchedule, CronSchedule]] |
schema module
DataModelMapping
Field | Description | Type |
---|---|---|
name |
Name of the data model field. This will be the name used when accessing the field from within detections. | str |
path |
Path to the target value in the Panther Event. This can be a simple field name or complete JSON path starting with a $ . JSON path syntax must be compatible with the jsonpath-ng Python package. |
Optional[str] |
func |
A Python function to access the target value. The input is the Panther Event and output is the target value in the Panther Event. | Optional[Callable[[PantherEvent], Any]] |
DataModel
Data Models provide a way to configure a set of unified fields across all log types.
Field | Description | Type |
---|---|---|
data_model_id |
The unique identifier of the data model. | str |
name |
What name to display in the UI and alerts. The data_model_id will be displayed if this field is not set. |
Optional[str] |
enabled |
Whether this data model is enabled. | Optional[bool] |
log_type |
What log type this data model will apply to. | str |
mappings |
Mapping from source field name or method to unified data model field name. | Union[DataModelMapping, List[DataModelMapping]] |
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
File details
Details for the file panther_sdk-0.0.24.tar.gz
.
File metadata
- Download URL: panther_sdk-0.0.24.tar.gz
- Upload date:
- Size: 34.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.9.15
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 30fb1c9d4931fb741cffaed23f611943288e26de82daa15cf8f27e41124008af |
|
MD5 | 39e7ef24efa05be600fe3d61fabf8442 |
|
BLAKE2b-256 | 508269b068c129049f52c872fd55d4004c4229b48d5e821ca986d355c8b1f727 |