Skip to main content

A health check and RCA tool for kubernetes

Project description

Entities

Inventory

Given a resource type, gather the inventory of all the resources of that type. Schema is simple:

    "items": {
      "type": "object",
      "properties": {
        "name": {
          "type": "string"
        },
        "id": {
          "type": "string"
        },
        "command": {
          "type": "string"
        },
        "matchPattern": {
          "type": "string"
        }
      },
      "required": ["id", "command", "matchPattern"],
      "additionalProperties": false
    }

Given a resource type, the inventory is a list of items. Each item has a name, id, command and matchPattern. command is the CLI to run to get the inventory. matchPattern is the pattern to use to split the output of the CLI into individual items.

Rules

Schema provided below:

    "items": {
      "type": "object",
      "properties": {
        "name": {
          "type": "string"
        },
        "category": {
          "type": "string"
        },
        "severity": {
          "type": "string",
          "enum": ["High", "Medium", "Low"]
        },
        "inventoryId": {
          "type": "string"
        },
        "cmd": {
          "type": "string"
        },
        "primary_input_name": {
          "type": "string"
        },
        "secondary_input_name": {
          "type": "string"
        },
        "pattern": {
          "type": "string"
        },
        "diags": {
          "type": "string"
        }
      },
      "required": ["name", "category", "severity", "inventoryId", "cmd", "primary_input_name",  "pattern"],
      "additionalProperties": false
    }

invetoryId is the id of the inventory to use for this rule. cmd is the CLI to run for the individual item. pattern is the pattern to establish return value of the rule. diags is the CLI to run to get the diagnostics for the rule in case of failures. primary_input_name is the name of the primary input to the rule. This is the resource type that the rule is subscribed to. In the CLI, it will likely be found in double curly braces. E.g. kubectl describe node {{node}} -o json

We are finding that this model does not fit well with doing something like kubectl get pods | grep -v Running. Because we are not longer working with a single resource, but rather performing a scan. And in this case the system wants to perform the scan itself. Single rules are relegated to just looking at state pertaining to a single resource. Even with a more compliant model, like kubectl describe pod {pod} | grep CrashLoopBackOff is turning out to be painful to code. This is because we don't have a good way of grepping.

Conclusion

Our model of having a check examine a singular object cannot scale if we use kubectl. This means that we have to go with the native python approach. In order to get value out of the LLM models however, the rule should include the kubectl command that its implementing to accomplish its task. ie what kubectl command would you type to get the same information that the rule is checking.

This leads us to an interesting approach: write the kubectl command for all the checks we are implementing, and use LLM to generate the code for the command. Ofcourse we need to review the generate the code, but this creates a good lock-step mechanism that we can.

The alternative would be to move away from a per object check into a check that examines all the objects of the given type.

Sample interaction with interactive python below


>>> s = """
...     {
...         "category": "Configuration",
...         "cmd": "kubectl describe nodes {{node}}",
...         "pattern": "grep 'cpu|memory|disk'",
...         "primary_input_name": "node",
...         "name": "Resource Limits",
...         "severity": "Medium",
...         "suite": "Generic"
...     }
... """
>>>
>>> s
'\n    {\n        "category": "Configuration",\n        "cmd": "kubectl describe nodes {{node}}",\n        "pattern": "grep \'cpu|memory|disk\'",\n        "primary_input_name": "node",\n        "name": "Resource Limits",\n        "severity": "Medium",\n        "suite": "Generic"\n    }\n'
>>> import json
>>> json.loads(s)
{'category': 'Configuration', 'cmd': 'kubectl describe nodes {{node}}', 'pattern': "grep 'cpu|memory|disk'", 'primary_input_name': 'node', 'name': 'Resource Limits', 'severity': 'Medium', 'suite': 'Generic'}
>>> r=json.loads(s)
>>> r.get('cmd')
'kubectl describe nodes {{node}}'
>>> r.get('primary_input_name')
'node'
>>> r.get('cmd').replace('{{' + r.get('primary_input_name') + '}}', 'n25')
'kubectl describe nodes n25'
>>>

The crawler can use something like this to fill up the template variables with the values from inventory. There is some duplication right now with the double curly and the primary_input_name. Lets see if it can be simplified.

    {
        "category": "Configuration",
        "cmd": "kubectl get pods --field-selector spec.nodeName=<NODE_NAME> -o json",
        "pattern": "Check .spec.hostNetwork is true",
        "name": "Misconfigured HostNetwork",
        "severity": "Medium",
        "suite": "Nodes - pod properties and errors"
    },

Simple Rules

Subscribe to a given resource inventory, perform a check on it using

  1. One or more CLIs to enrich
  2. match the outputs of the CLIs in the previous step against a set of patterns
  3. on match, collect a diagnostics output
  4. return a boolean, diagnostics output

This is where the expert system is getting created. A rule is a specific failure signature that can be detected by a set of patterns. The patterns are matched against the output of the CLIs. Therefore the human knowledge in this system is encoded in two places:

  1. The match patterns to detect the failure.
  2. The diagnostics to collect once the failure has been detected.

(2) will come in handy because we can implement an AI based backend that can examine the diagnostics and previde possible solutions.

Complex Rules

These rules process two types of resource types together and perform a check on them. E.g.

    {
        "category": "Storage",
        "cmd": "kubectl get pv -o json",
        "pattern": "Check .status.phase for PVs linked to the node",
        "name": "Unbound PVs on Node",
        "severity": "Medium",
        "suite": "Nodes - other properties"
    },

This rule is processing two resource types: Node and PersistentVolume. In such cases, the framework needs to provide the right combination of (Node, PV). ie it should not be a NxM implementation, but rather use a filter to provide the right set of PV for given Node This filter should also be encoded in the rule.

This implementation can be simplified if we think in terms of a primary resource and a secondary resource. The rule is always applied on the primary resource, and the values of secondary resource are generated using a function with primary resource as input.

E.g. in the above case, the primary resource is Node and the secondary resource is PV. The function is getPVsForNode(Node).

This function can be implemented in the rule itself or can be specified in the main rule object. the benefit of specifying it in the main rule object is that the same function can be used by multiple rules. Also, the rule can just focus on the checks and not worry about how to get the secondary resource.

At this point, it feels like just having a primary and secondary input is enough and there is no need for a third input.

Crawler

Given an inventory and a set of rules (that subscribe to a given type of resource), run the rules on the inventory and return the results. The crawler also computes the correct set of inputs for complex rules as specified above by calling the generator function provided in the rule.

Formats

Picking this up from prowler

(unctl) ➜  prowler git:(master) cat providers/aws/services/eks/eks_endpoints_not_publicly_accessible/eks_endpoints_not_publicly_accessible.metadata.json
{
  "Provider": "aws",
  "CheckID": "eks_endpoints_not_publicly_accessible",
  "CheckTitle": "Ensure EKS Clusters are created with Private Endpoint Enabled and Public Access Disabled",
  "CheckType": [
    "Protect",
    "Secure network configuration",
    "Resources not publicly accessible"
  ],
  "ServiceName": "eks",
  "SubServiceName": "",
  "ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
  "Severity": "high",
  "ResourceType": "AwsEksCluster",
  "Description": "Ensure EKS Clusters are created with Private Endpoint Enabled and Public Access Disabled",
  "Risk": "Publicly accessible services could expose sensitive data to bad actors.",
  "RelatedUrl": "",
  "Remediation": {
    "Code": {
      "CLI": "aws eks update-cluster-config --region <region_name> --name <cluster_name> --resources-vpc-config endpointPublicAccess=false,endpointPrivateAccess=true,publicAccessCidrs=[\"123.123.123.123/32\"]",
      "NativeIaC": "",
      "Other": "https://github.com/cloudmatos/matos/tree/master/remediations/aws/eks/eks-disable-public-endpoint",
      "Terraform": ""
    },
    "Recommendation": {
      "Text": "Enable private access to the Kubernetes API server so that all communication between your nodes and the API server stays within your VPC. Disable internet access to the API server.",
      "Url": "https://docs.aws.amazon.com/eks/latest/userguide/infrastructure-security.html"
    }
  },
  "Categories": [
    "internet-exposed"
  ],
  "DependsOn": [],
  "RelatedTo": [],
  "Notes": ""
}

All fields are applicable and we should be able to use this completely (see file lib/check/models.py)

class Check_Metadata_Model(BaseModel):
    """Check Metadata Model"""
    Provider: str
    CheckID: str
    CheckTitle: str
    CheckType: list[str]
    ServiceName: str
    SubServiceName: str
    ResourceIdTemplate: str
    Severity: str
    ResourceType: str
    Description: str
    Risk: str
    RelatedUrl: str
    Remediation: Remediation
    Categories: list[str]
    DependsOn: list[str]
    RelatedTo: list[str]
    Notes: str
    # We set the compliance to None to
    # store the compliance later if supplied
    Compliance: list = None

For each check the return object is described below

@dataclass
class Check_Report:
    """Contains the Check's finding information."""

    status: str
    status_extended: str
    check_metadata: Check_Metadata_Model
    resource_details: str
    resource_tags: list

    def __init__(self, metadata):
        self.status = ""
        self.check_metadata = Check_Metadata_Model.parse_raw(metadata)
        self.status_extended = ""
        self.resource_details = ""
        self.resource_tags = []


@dataclass
class Check_Report_AWS(Check_Report):
    """Contains the AWS Check's finding information."""

    resource_id: str
    resource_arn: str
    region: str

    def __init__(self, metadata):
        super().__init__(metadata)
        self.resource_id = ""
        self.resource_arn = ""
        self.region = ""

The execution model is per check. ie the check is expanding the inventory and running the check on each item in the inventory. The inventory has been cached by the service framework

class ec2_instance_public_ip(Check):
    def execute(self):
        findings = []
        for instance in ec2_client.instances:
            if instance.state != "terminated":
                report = Check_Report_AWS(self.metadata())
                report.region = instance.region
                report.resource_arn = instance.arn
                report.resource_tags = instance.tags
                report.status = "PASS"
                report.status_extended = (
                    f"EC2 Instance {instance.id} does not have a Public IP."
                )
                report.resource_id = instance.id
                if instance.public_ip:
                    report.status = "FAIL"
                    report.status_extended = f"EC2 Instance {instance.id} has a Public IP: {instance.public_ip} ({instance.public_dns})."
                    report.resource_id = instance.id

                findings.append(report)

        return findings

Suggested file system layout

k8s-checks/
│
├── bin/
│   ├── k8s_checker.py          # Main entry script if needed
│
├── k8s_checks/
│   ├── __init__.py
│   ├── core/
│   │   ├── __init__.py
│   │   ├── executor.py         # Code to execute kubectl commands
│   │   ├── validator.py        # Schema validation logic
│   │   └── utils.py            # Utility functions
│   │
│   ├── inventories/
│   │   ├── __init__.py
│   │   ├── node_inventory.py
│   │   ├── pod_inventory.py
│   │   └── ...
│   │
│   ├── rules/
│   │   ├── __init__.py
│   │   ├── node_rules.py
│   │   ├── pod_rules.py
│   │   └── ...
│   │
│   └── tests/
│       ├── __init__.py
│       ├── test_executor.py
│       ├── test_validator.py
│       └── ...
│
├── schemas/
│   ├── inventory_schema.json
│   └── rules_schema.json
│
├── logs/                       # If you're logging results or errors
│
├── README.md
├── requirements.txt            # Python dependencies
└── setup.py                    # If you intend to package and distribute it

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

unctl-0.1.2.tar.gz (50.9 kB view hashes)

Uploaded Source

Built Distribution

unctl-0.1.2-py3-none-any.whl (67.2 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page