Aqua Security SDK to pull data from Aquasec Tenant and CSPM for auditing
Project description
aquasec-api
Aqua Sec Cloud Secuirty API Tool used for interacting with Aqua Security CSPM Enterprise and Workload.
Documents
Installation
Install
Production Version:
python -m pip install aquasec-api
Testing Version:
pip install -i https://test.pypi.org/simple/ aquasec-api
Configurations
You are able to directly interact with the SDK and pass the variables required to get the creadentials required. You can also have that handled inside of a configuration file or environment.
-
Using system environment variables. Below is a sample. Required:
AQUA_WORKLOAD_API_KEY="workload key" AQUA_WORKLOAD_API_SECRET="workload secret" AQUA_CSPM_API_KEY="cspm key" AQUA_CSPM_API_SECRET="cspm secret" AQUA_API_VERSION"='v2'
Optional:
AQUA_LOGNAME="aquasec.log" AQUA_LOGLOCATION="/tmp/logs/" AQUA_LOGSTREAM=true AQUA_LOGGING="INFO" AQUA_SET_LOG=true AQUA_CERT=false
-
Using a yaml configuration file located in ~/.config/.aquaconf. If a YAML config is found that will override any env variables taking priority.
Required:
AQUA_WORKLOAD_API_KEY: "workload key" AQUA_WORKLOAD_API_SECRET: "workload secret" AQUA_CSPM_API_KEY: "cspm key" AQUA_CSPM_API_SECRET: "cspm secret" AQUA_API_VERSION: 'v2'
Optional:
AQUA_CERT: true AQUA_LOGGING: 'DEBUG' AQUA_LOGNAME: 'aquas.log' AQUA_LOGSTREAM: true AQUA_SET_LOG: true AQUA_LOGLOCATION: "/tmp/logs/"
NOTE: The Certificate is the verification used for the RestAPI calls. This will be called upon unless you specify in your own verify= in your method call. Just like in the Requests module this is a (str|bool) value that defaults to True. If it is a string it will confirm that the string is a file and therefore the location of a specific cert to be verified against a Proxy forwarder.
Usage
Workload Protection
Inorder to ensure it workload auth works please be sure to pass the correct paremters that are not set in the configurations. You will require to set variables:
- allowed_endpoints: list
- Default: ["api_auditor"]
- csp_roles: list
- Default: ["ANY"]
>>> from aquasec.api import API
>>> api = API(csp_roles=["api_auditor"], allowed_endpoints=["ANY", "GET", "POST"])
INFO : Created WorkloadAuth Token for URL https://1234567890ab.cloud.aquasec.com
>>> api.get.workload_protection(url_path='license')
INFO : Created Workload URL=https://1234567890ab.cloud.aquasec.com/api/v2/license
DEBUG : Response Code: 200| Full Response: {"type":"Standard","organization":"ACME Corp, Inc.","account_id":"","client_name":"user@ACME Corp, Inc.-2023-03-29-StandardS","name":"","email":"john.doe@acme.com","num_agents":0,"num_microenforcers":0,"num_hostenforcers":0,"num_images":0,"num_functions":10000,"num_advanced_functions":0,"num_pas":-1,"num_code_repositories":0,"license_issue_date":1641772800,"license_exp_date":1768003199,"non_prod":false,"approved":true,"external_token":"","strict":false,"level":"Advanced","vpatch":true,"vpatch_coverage":0,"malware_protection":true,"tier":"","agents_running":0,"images_scanned":0,"num_protected_kube_nodes":0,"resource_status":{"Enforcers":"valid","Kubernetes cluster protection":"valid","MicroEnforcers":"valid","Repositories":"valid","VM Enforcers":"valid"}}
Bypass OS or YAML Configs:
>>> from aquasec.api import API
>>> api = API(api_key="7d6c02219a99", api_secret="0b3b928a1acd4c2580583cc160f49f5e",api_csp_roles=["CSP_USER"],allowed_endpoints=["ANY"])
INFO : Created WorkloadAuth Token for URL https://1234567890ab.cloud.aquasec.com
Common Params:
{
"page": 1,
"pagesize": 1000
}
I found treating page similar to "limit" for a typical API call limiting the amount of responses and "pagesize" is akin to "offset". Responses typically look like this:
{
"count": 8793,
"page": 1,
"pagesize": 9000,
"result": [
{
"id": "9ad366ef-1494-44c1-9b1f-928bca02cf7d",
"name": "someserver.acme.com",
}
],
"query": {
"identifiers_only": false,
"enforcer_type": "",
"status": "",
"cluster": "",
"image_name": "",
"image_id": "",
"server_id": "",
"kube_enforcer_id": "",
"batch_name": "",
"compliant": "",
"address": "",
"cve": "",
"config_file_name": "",
"scope": "",
"machine_ids": null,
"kube_enforcer_exists": false,
"ke_kube_bench_feature_flag": false
},
"more_data_all_pages": 0,
"is_estimated_count": false
}
Common useful endpoints:
-
Get all hosts (I increase the size based on my company count; you can build out a refresh to get everything until the count equals the amount of records returned)
>> all_hosts = api.get.workload_protection(url_path='hosts', api_version='v1', get_all=True)
-
Get CIS Benchmark Results
>> host_id = all_hosts['result'][0]['id'] >> cis_benchmark = api.get.workload_protection(url_path=f'risks/bench/{host_id}/bench_results')
-
Get Kubernetes Resources
>> kube_resources = api.get.workload_protection(url_path='kubernetesresources', params={'pagesize': 1000})
-
Get Kubernetes Applications
>> applications = api.get.workload_protection(endpoint='applications', api_version='v1')
-
Get Containers
>>> all_containers = api.get.workload_protection(url_path="containers", api_version='v2', get_all=True)
-
Get CIS Bench Reports Directly
# Kube Report Only for Production Cluster >>> kube_report = api.get.bench_reports(report_type='kube_bench', cluser_name='production') # Kube Report for all Clusters >>> kube_report = api.get.bench_reports(report_type='kube_bench') # Linux Report Only >>> all_linux_report = api.get.bench_reports(report_type='linux') # disa_stig Report >>> disa_stig_report = api.get.bench_reports(report_type='disa_stig') # Full CIS Benchmark Report on all Hosts >>> full_cis_report = api.get.bench_reports(report_type='all')
NOTE: The report published here is a standard collecion of host key values with the raw report generated as the output. See Orchestration for adjusted reports.
Orchestration
Building customized CIS Bench Reports
"""Sample Bench Report"""
import json
from aquasec.orchestration.bench_report import BenchReport
from aquasec.api import API
# create api token
api = API(api_key="7d6c02219a99", api_secret="0b3b928a1acd4c2580583cc160f49f5e",api_csp_roles=["CSP_USER"],allowed_endpoints=["ANY"])
# create and run report Bench Object
bench = BenchReport(api=api, report_type='all')
bench.run()
# write out report
with open('/var/tmp/bench_report_20230501.json','w', encoding='utf-8') as f:
json.dump(bench.bench_report, f)
Release Info
v0.0.4
- Added Orchestration and adjustment of bench report
- bug with full flat_list need to build that
v0.0.3
- Added ability to POST
- Adding PUT
- Additional Datastructures
- Additional Delete Functionality
- Introducing Orchestration on Actions
- Bug in response code when we POST and possibly PUT an object; no json is returned just a 204. This breaks the standard return expectation. Raised issue with AquaSec. Till than buillt a way to handle it safely and introducted a message response to those responses.
v0.0.2
- added retrieve_full_list() which allows get to retrieve all items.
- if "get_all" is specified in api.get.workload_protection() the variable will retrieve all possible values.
- updates to README.md, fixed a few typos.
- added ability to retrieve CIS bench reports directly without the need to run mulitple calls.
- Fixed issue with "get_all" where it would go into an infinant loop since the count return did not always match the results.
- Provides direct ability to call on all reports or individual reports.
- Fixed issue where passing api_key or api_secret when creating an API Object would not properly create the WorkloadAuth.
v0.0.1
- WorkloadAuth - usage to get auth token for workload tasks
- API - used to run api calls against CSPM or Workload
- Baseline version to interact with CSPM Enterprise and Workload
- GET is built out to handle almost any api call you need. You just need to figure out the endpoint and pass the url path through the workload_protection or cspm
Version
Version | Build | Changes |
---|---|---|
0.0.1 | a1 | Initial Alpha Release. Not working baseline for testing |
0.0.1 | a2 | built in decorartor and two methods of handling different request types |
0.0.1 | a3 | fixed manifest for package deployment |
0.0.1 | a4 | fixed requirements |
0.0.1 | a5 | removed pathlib from requirments |
0.0.1 | rc1 | updated readme.md with instructions of usage |
0.0.1 | rc2 | issues with dataclasses module |
0.0.1 | rc3 | issues with dataclasses and requirements |
0.0.1 | rc4 | issues with dataclasses and requirements |
0.0.1 | rc8 | final release that solves how the auth works for CSPM and Workload Protection |
0.0.2 | a1 | Updated readme testing some additional modeling and possible integration scripts |
0.0.2 | a2 | Added ability to retrieve all functions leveraging paging |
0.0.2 | a3 | Added CIS benchmark reports, Fix bug with infinate get_all |
0.0.2 | rc1 | Bug with providing direct api information into api function with WorkloadAuth |
0.0.2 | final | completed orchestration of bench report and standard get workload checks |
0.0.3 | a1 | Intro to POST, PUT, DELETE and adding some datastructures for creating and manipluating AquaSec |
0.0.4 | a1 | updates to report structure |
0.0.4 | a2 | updates to some docstring in BenchReport |
NOTE: Use at your own risk!!!! API as is and building on it.
Security Policy
Supported Versions
Use this section to tell people about which versions of your project are currently being supported with security updates.
Version | Supported |
---|---|
0.0.3 | :x: |
0.0.2 | :white_check_mark: |
Reporting a Vulnerability
Use this section to tell people how to report a vulnerability.
Tell them where to go, how often they can expect to get an update on a reported vulnerability, what to expect if the vulnerability is accepted or declined, etc.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for aquasec_api-0.0.4-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 16137423d8737a2921e3e647359ad535725873a34d48e6b838aaa198f33d93ff |
|
MD5 | 8b3923c8030d23d7cb0b64e638113222 |
|
BLAKE2b-256 | 4f7fdd67d7035fe04bb5b4aeedd37cdf01381f468b8cc14c5ad4038bc78bccee |