Skip to main content

Aqua Security SDK to pull data from Aquasec Tenant and CSPM for auditing

Project description

aquasec-api

Aqua Sec Cloud Secuirty API Tool used for interacting with Aqua Security CSPM Enterprise and Workload.

Documents

Installation

Install

Production Version:

python -m pip install aquasec-api

Testing Version:

pip install -i https://test.pypi.org/simple/ aquasec-api

Configurations

You are able to directly interact with the SDK and pass the variables required to get the creadentials required. You can also have that handled inside of a configuration file or environment.

  1. Using system environment variables. Below is a sample. Required:

    AQUA_WORKLOAD_API_KEY="workload key"
    AQUA_WORKLOAD_API_SECRET="workload secret"
    AQUA_CSPM_API_KEY="cspm key"
    AQUA_CSPM_API_SECRET="cspm secret"
    AQUA_API_VERSION"='v2'
    

    Optional:

    AQUA_LOGNAME="aquasec.log"
    AQUA_LOGLOCATION="/tmp/logs/"
    AQUA_LOGSTREAM=true
    AQUA_LOGGING="INFO"
    AQUA_SET_LOG=true
    AQUA_CERT=false
    
  2. Using a yaml configuration file located in ~/.config/.aquaconf. If a YAML config is found that will override any env variables taking priority.

    Required:

    AQUA_WORKLOAD_API_KEY: "workload key"
    AQUA_WORKLOAD_API_SECRET: "workload secret"
    AQUA_CSPM_API_KEY: "cspm key"
    AQUA_CSPM_API_SECRET: "cspm secret"
    AQUA_API_VERSION: 'v2'
    

    Optional:

    AQUA_CERT: true
    AQUA_LOGGING: 'DEBUG'
    AQUA_LOGNAME: 'aquas.log'
    AQUA_LOGSTREAM: true
    AQUA_SET_LOG: true
    AQUA_LOGLOCATION: "/tmp/logs/"
    

NOTE: The Certificate is the verification used for the RestAPI calls. This will be called upon unless you specify in your own verify= in your method call. Just like in the Requests module this is a (str|bool) value that defaults to True. If it is a string it will confirm that the string is a file and therefore the location of a specific cert to be verified against a Proxy forwarder.

Usage

Workload Protection

Inorder to ensure it workload auth works please be sure to pass the correct paremters that are not set in the configurations. You will require to set variables:

  • allowed_endpoints: list
    • Default: ["api_auditor"]
  • csp_roles: list
    • Default: ["ANY"]
>>> from aquasec.api import API
>>> api = API(csp_roles=["api_auditor"], allowed_endpoints=["ANY", "GET", "POST"])
INFO    : Created WorkloadAuth Token for URL https://1234567890ab.cloud.aquasec.com
>>> api.get.workload_protection(url_path='license')
INFO    : Created Workload URL=https://1234567890ab.cloud.aquasec.com/api/v2/license
DEBUG   : Response Code: 200| Full Response: {"type":"Standard","organization":"ACME Corp, Inc.","account_id":"","client_name":"user@ACME Corp, Inc.-2023-03-29-StandardS","name":"","email":"john.doe@acme.com","num_agents":0,"num_microenforcers":0,"num_hostenforcers":0,"num_images":0,"num_functions":10000,"num_advanced_functions":0,"num_pas":-1,"num_code_repositories":0,"license_issue_date":1641772800,"license_exp_date":1768003199,"non_prod":false,"approved":true,"external_token":"","strict":false,"level":"Advanced","vpatch":true,"vpatch_coverage":0,"malware_protection":true,"tier":"","agents_running":0,"images_scanned":0,"num_protected_kube_nodes":0,"resource_status":{"Enforcers":"valid","Kubernetes cluster protection":"valid","MicroEnforcers":"valid","Repositories":"valid","VM Enforcers":"valid"}}

Bypass OS or YAML Configs:

>>> from aquasec.api import API
>>> api = API(api_key="7d6c02219a99", api_secret="0b3b928a1acd4c2580583cc160f49f5e",api_csp_roles=["CSP_USER"],allowed_endpoints=["ANY"])
INFO    : Created WorkloadAuth Token for URL https://1234567890ab.cloud.aquasec.com

Common Params:

{
    "page": 1,
    "pagesize": 1000
}

I found treating page similar to "limit" for a typical API call limiting the amount of responses and "pagesize" is akin to "offset". Responses typically look like this:

{
    "count": 8793,
    "page": 1,
    "pagesize": 9000,
    "result": [
        {
            "id": "9ad366ef-1494-44c1-9b1f-928bca02cf7d",
            "name": "someserver.acme.com",
        }
    ],
    "query": {
        "identifiers_only": false,
        "enforcer_type": "",
        "status": "",
        "cluster": "",
        "image_name": "",
        "image_id": "",
        "server_id": "",
        "kube_enforcer_id": "",
        "batch_name": "",
        "compliant": "",
        "address": "",
        "cve": "",
        "config_file_name": "",
        "scope": "",
        "machine_ids": null,
        "kube_enforcer_exists": false,
        "ke_kube_bench_feature_flag": false
    },
    "more_data_all_pages": 0,
    "is_estimated_count": false
}

Common useful endpoints:

  • Get all hosts (I increase the size based on my company count; you can build out a refresh to get everything until the count equals the amount of records returned)

    >> all_hosts = api.get.workload_protection(url_path='hosts', api_version='v1', get_all=True)
    
  • Get CIS Benchmark Results

    >> host_id = all_hosts['result'][0]['id']
    >> cis_benchmark = api.get.workload_protection(url_path=f'risks/bench/{host_id}/bench_results')
    
  • Get Kubernetes Resources

    >> kube_resources = api.get.workload_protection(url_path='kubernetesresources', params={'pagesize': 1000})
    
  • Get Kubernetes Applications

    >> applications = api.get.workload_protection(endpoint='applications', api_version='v1')
    
  • Get Containers

    >>> all_containers = api.get.workload_protection(url_path="containers", api_version='v2', get_all=True)
    
  • Get CIS Bench Reports Directly

    # Kube Report Only for Production Cluster
    >>> kube_report = api.get.bench_reports(report_type='kube_bench', cluser_name='production')
    # Kube Report for all Clusters
    >>> kube_report = api.get.bench_reports(report_type='kube_bench')
    # Linux Report Only
    >>> all_linux_report = api.get.bench_reports(report_type='linux')
    # disa_stig Report
    >>> disa_stig_report = api.get.bench_reports(report_type='disa_stig')
    # Full CIS Benchmark Report on all Hosts
    >>> full_cis_report = api.get.bench_reports(report_type='all')
    

    NOTE: The report published here is a standard collecion of host key values with the raw report generated as the output. See Orchestration for adjusted reports.

Orchestration

Building customized CIS Bench Reports

"""Sample Bench Report"""

import json

from aquasec.orchestration.bench_report import BenchReport
from aquasec.api import API

# create api token
api = API(api_key="7d6c02219a99", api_secret="0b3b928a1acd4c2580583cc160f49f5e",api_csp_roles=["CSP_USER"],allowed_endpoints=["ANY"])

# create and run report Bench Object
bench = BenchReport(api=api, report_type='all')
bench.run()

# write out report
with open('/var/tmp/bench_report_20230501.json','w', encoding='utf-8') as f:
    json.dump(bench.bench_report, f)

Release Info

v0.0.4

  • Added Orchestration and adjustment of bench report
  • bug with full flat_list need to build that

v0.0.3

  • Added ability to POST
  • Adding PUT
  • Additional Datastructures
  • Additional Delete Functionality
  • Introducing Orchestration on Actions
  • Bug in response code when we POST and possibly PUT an object; no json is returned just a 204. This breaks the standard return expectation. Raised issue with AquaSec. Till than buillt a way to handle it safely and introducted a message response to those responses.

v0.0.2

  • added retrieve_full_list() which allows get to retrieve all items.
  • if "get_all" is specified in api.get.workload_protection() the variable will retrieve all possible values.
  • updates to README.md, fixed a few typos.
  • added ability to retrieve CIS bench reports directly without the need to run mulitple calls.
  • Fixed issue with "get_all" where it would go into an infinant loop since the count return did not always match the results.
  • Provides direct ability to call on all reports or individual reports.
  • Fixed issue where passing api_key or api_secret when creating an API Object would not properly create the WorkloadAuth.

v0.0.1

  • WorkloadAuth - usage to get auth token for workload tasks
  • API - used to run api calls against CSPM or Workload
  • Baseline version to interact with CSPM Enterprise and Workload
  • GET is built out to handle almost any api call you need. You just need to figure out the endpoint and pass the url path through the workload_protection or cspm

Version

Version Build Changes
0.0.1 a1 Initial Alpha Release. Not working baseline for testing
0.0.1 a2 built in decorartor and two methods of handling different request types
0.0.1 a3 fixed manifest for package deployment
0.0.1 a4 fixed requirements
0.0.1 a5 removed pathlib from requirments
0.0.1 rc1 updated readme.md with instructions of usage
0.0.1 rc2 issues with dataclasses module
0.0.1 rc3 issues with dataclasses and requirements
0.0.1 rc4 issues with dataclasses and requirements
0.0.1 rc8 final release that solves how the auth works for CSPM and Workload Protection
0.0.2 a1 Updated readme testing some additional modeling and possible integration scripts
0.0.2 a2 Added ability to retrieve all functions leveraging paging
0.0.2 a3 Added CIS benchmark reports, Fix bug with infinate get_all
0.0.2 rc1 Bug with providing direct api information into api function with WorkloadAuth
0.0.2 final completed orchestration of bench report and standard get workload checks
0.0.3 a1 Intro to POST, PUT, DELETE and adding some datastructures for creating and manipluating AquaSec
0.0.4 a1 updates to report structure
0.0.4 a2 updates to some docstring in BenchReport

NOTE: Use at your own risk!!!! API as is and building on it.

Security Policy

Supported Versions

Use this section to tell people about which versions of your project are currently being supported with security updates.

Version Supported
0.0.3 :x:
0.0.2 :white_check_mark:

Reporting a Vulnerability

Use this section to tell people how to report a vulnerability.

Tell them where to go, how often they can expect to get an update on a reported vulnerability, what to expect if the vulnerability is accepted or declined, etc.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aquasec-api-0.0.4.tar.gz (60.3 kB view hashes)

Uploaded Source

Built Distribution

aquasec_api-0.0.4-py3-none-any.whl (49.8 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page