A Python library to help with some common threat hunting data analysis operations
A Python library to help with some common threat hunting data analysis operations
huntlib module provides three major object classes as well as a few convenience functions.
- ElasticDF: Search Elastic and return results as a Pandas DataFrame
- SplunkDF: Search Splunk and return results as a Pandas DataFrame
- DomainTools: Convenience functions for accessing the DomainTools API, primarily focused around data enrichment (requires a DomainTools API subscription)
- data.read_json(): Read one or more JSON files and return a single Pandas DataFrame
- data.read_csv(): Read one or more CSV files and return a single Pandas DataFrame
- entropy() / entropy_per_byte(): Calculate Shannon entropy
- promptCreds(): Prompt for login credentials in the terminal or from within a Jupyter notebook.
- edit_distance(): Calculate how "different" two strings are from each other
huntlib now provides a library-wide configuration file,
~/.huntlibrc allowing you to set certain runtime defaults. Consult the file
huntlibrc-sample in this repo for more information.
ElasticDF() class searches Elastic and returns results as a Pandas DataFrame. This makes it easier to work with the search results using standard data analysis techniques.
Create a plaintext connection to the Elastic server, no authentication
e = ElasticDF( url="http://localhost:9200" )
The same, but with SSL and authentication
e = ElasticDF( url="https://localhost:9200", ssl=True, username="myuser", password="mypass" )
Fetch search results from an index or index pattern for the previous day
df = e.search_df( lucene="item:5282 AND color:red", index="myindex-*", days=1 )
The same, but do not flatten structures into individual columns. This will result in each structure having a single column with a JSON string describing the structure.
df = e.search_df( lucene="item:5282 AND color:red", index="myindex-*", days=1, normalize=False )
A more complex example, showing how to set the Elastic document type, use Python-style datetime objects to constrain the search to a certain time period, and a user-defined field against which to do the time comparisons. The result size will be limited to no more than 1500 entries.
df = e.search_df( lucene="item:5285 AND color:red", index="myindex-*", doctype="doc", date_field="mydate", start_time=datetime.now() - timedelta(days=8), end_time=datetime.now() - timedelta(days=6), limit=1500 )
search_df methods will raise
in the event that the search request is syntactically correct but is otherwise
invalid. For example, if you request more results be returned than the server
is able to provide. They will raise
AuthenticationErrorSearchException in the
event the server denied the credentials during login. They can also raise an
UnknownSearchException for other situations, in which case the exception
message will contain the original error message returned by Elastic so you
can figure out what went wrong.
SplunkDF class search Splunk and returns the results as a Pandas DataFrame. This makes it easier to work with the search results using standard data analysis techniques.
Establish an connection to the Splunk server. Whether this is SSL/TLS or not depends on the server, and you don't really get a say.
s = SplunkDF( host=splunk_server, username="myuser", password="mypass" )
SplunkDF will raise
AuthenticationErrorSearchException during initialization
in the event the server denied the supplied credentials.
Fetch all search results across all time
df = s.search_df( spl="search index=win_events EventCode=4688" )
Fetch only specific fields, still across all time
df = s.search_df( spl="search index=win_events EventCode=4688 | table ComputerName _time New_Process_Name Account_Name Creator_Process_ID New_Process_ID Process_Command_Line" )
Time bounded search, 2 days prior to now
df = s.search_df( spl="search index=win_events EventCode=4688", days=2 )
Time bounded search using Python datetime() values
df = s.search_df( spl="search index=win_events EventCode=4688", start_time=datetime.now() - timedelta(days=2), end_time=datetime.now() )
Time bounded search using Splunk notation
df = s.search_df( spl="search index=win_events EventCode=4688", start_time="-2d@d", end_time="@d" )
Limit the number of results returned to no more than 1500
df = s.search_df( spl="search index=win_events EventCode=4688", limit=1500 )
NOTE: The value specified as the
limit is also subject to a server-side max
value. By default, this is 50000 and can be changed by editing limits.conf on
the Splunk server. If you use the limit parameter, the number of search results
you receive will be the lesser of the following values: 1) the actual number of
results available, 2) the number you asked for with
limit, 3) the server-side
maximum result size. If you omit limit altogether, you will get the true
number of search results available without subject to additional limits, though
your search may take much longer to complete.
Return only specified fields
df = s.search_df( spl="search index=win_events EventCode=4688", fields="NewProcessName,SubjectUserName" )
NOTE: By default, Splunk will only return the fields you reference in the
search string (i.e. you must explicitly search on "NewProcessName" if you want
that field in the results. Usually this is not what we want. When fields is not
the query string will be rewritten with "| fields <fields>" at the end (e.g.,
search index=win_events EventCode=4688 | fields NewProcessName,SubjectUserName). This
works fine for most simple cases, but if you have a more complex SPL query and it breaks,
fields=None in your function call to avoid this behavior.
Try to remove Splunk's "internal" fields from search results:
df = s.search_df( spl="search index=win_events EventCode=4688", internal_fields=False )
This will remove such fields as
_sourcetype as well as any other field who's name begins with
_. This behavior occurs by default (
internal_fields defaults to
False), but you can disable it by using
Remove named field(s) from the search results:
df = s.search_df( spl="search index=win_events EventCode=4688", internal_fields="_bkt,_cd,_indextime,_raw,_serial,_si,_sourcetype,_subsecond,_time" )
In the event you need more control over which "internal" fields to drop, you can pass a comma-separated list of field names (NOTE: these can be any field, not just Splunk internal fields).
Splunk's Python API can be quite slow, so to speed things up you may elect to spread the result retrieval among multiple cores. The default is to use one (1) extra core, but you can use the
processes argument to
search_df() to set this higher if you like.
df = s.search_df( spl="search index=win_events EventCode=4688", processes=4 )
If you prefer to use all your cores, try something like:
from multiprocessing import cpu_count df = s.search_df( spl="search index=win_events EventCode=4688", processes=cpu_count() )
NOTE: You may have to experiment to find the optimal number of parallel processes for your specific environment. Maxing out the number of workers doesn't always give the best performance.
DomainTools class allows you to easily perform some common types of calls
to the DomainTools API. It uses their official
domaintools_api Python module
to do most of the work but is not a complete replacement for that module. In
particular, this class concentrates on a few calls that are most relevant for
data analytic style threat hunting (risk & reputation scores, WHOIS info, etc).
DomainTools class can make use of the global config file
~/.huntlibrc to store the API username and secret key, if desired. See the
huntlibrc-sample file for more info.
from huntlib.domaintools import DomainTools
Instantiate a new
dt = DomainTools( api_username="myuser, api_key="mysecretkey )
Instatiate a new
DomainTools object using default creds stored in
dt = DomainTools()
Look up API call limits and usage info for the authenticated user:
Return the list of API calls to which the authenticated user has access:
Return basic WHOIS info for a domain or IP address:
Return WHOIS info with additional fields parsed from the text part of the record:
Find newly-activated or pending domain registrations matching all the supplied search terms:
dt.brand_monitor('myterm') dt.brand_monitor('myterm1|myterm2|myterm3') # terms are ANDed together
Look up basic info about a domain's DNS, WHOIS, hosting and web site in one query.
Return a list of risk scores for a domain, according to different risk factors:
Return a single consolidated risk score for a domain:
Enrich a pandas DataFrame containing a mixture of domains and/or IP address in a column called 'iocs':
df = dt.enrich(df, column='iocs')
Enrichment tends to add a large number of columns, which you may not need. Use the
fields parameter if you know exactly what you want:
df = dt.enrich( df, column='iocs', fields=[ 'dt_whois.registration.created', 'dt_reputation.risk_score' ] )
Enrichment may take quite some time with a large dataset. If you're antsy, try turning on the progress bars:
df = dt.enrich(df, column='iocs', progress_bar=True)
huntlib.data module contains functions that make it easier to deal with data files.
Reading Multiple Data Files
huntlib provides two convenience functions to replace the standard Pandas
read_csv() functions. These replacement functions work exaclty the same as their originals, and take all the same arguments. The only difference is that they are capable of accepting a filename wildcard in addition to the name of a single file. All files matching the wildcard expression will be read and returned as a single
Start by importing the functions from the module:
from huntlib.data import read_csv, read_json
Here's an example of reading a single JSON file, where each line is a separate JSON document:
df = read_json("data.json", lines=True)
Similarly, this will read all JSON files in the current directory:
df = read_json("*.json", lines=True)
read_csv function works the same way:
df = read_csv("data.csv)
df = read_csv("*.csv")
Consult the Pandas documentation for information on supported options for
We define two entropy functions,
entropy_per_byte(). Both accept a single string as a parameter. The
entropy() function calculates the Shannon entropy of the given string, while
entropy_per_byte() attempts to normalize across strings of various lengths by returning the Shannon entropy divided by the length of the string. Both return values are
>>> entropy("The quick brown fox jumped over the lazy dog.") 4.425186429663008 >>> entropy_per_byte("The quick brown fox jumped over the lazy dog.") 0.09833747621473352
The higher the value, the more data potentially embedded in it.
Sometimes you need to provide credentials for a service, but don't want to hard-code them into your scripts, especially if you're collaborating on a hunt.
huntlib provides the
promptCreds() function to help with this. This function works well both in the terminal and when called from within a Jupyter notebook.
Call it like so:
(username, password) = promptCreds()
You can change one or both of the username/password prompts by passing arguments:
(username, password) = promptCreds(uprompt="LAN ID: ", pprompt="LAN Pass: ")
String similarity can be expressed in terms of "edit distance", or the number of single-character edits necessary to turn the first string into the second string. This is often useful when, for example, you want to find two strings that very similar but not identical (such as when hunting for process impersonation).
There are a number of different ways to compute similarity.
huntlib provides the
edit_distance() function for this, which supports several algorithms:
- Levenshtein Distance
- Damerau-Levenshtein Distance
- Hamming Distance
- Jaro Distance
- Jaro-Winkler Distance
Here's an example:
>>> huntlib.edit_distance('svchost', 'scvhost') 1
You can specify a different algorithm using the
method parameter. Valid methods are
jaro-winkler. The default is
>>> huntlib.edit_distance('svchost', 'scvhost', method='levenshtein') 2
Release history Release notifications | RSS feed
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
|Filename, size||File type||Python version||Upload date||Hashes|
|Filename, size huntlib-0.5.0a1-py3-none-any.whl (22.0 kB)||File type Wheel||Python version py3||Upload date||Hashes View|
|Filename, size huntlib-0.5.0a1.tar.gz (24.0 kB)||File type Source||Python version None||Upload date||Hashes View|
Hashes for huntlib-0.5.0a1-py3-none-any.whl