A health check and RCA tool for kubernetes
Project description
Entities
Inventory
Given a resource type, gather the inventory of all the resources of that type. Schema is simple:
"items": {
"type": "object",
"properties": {
"name": {
"type": "string"
},
"id": {
"type": "string"
},
"command": {
"type": "string"
},
"matchPattern": {
"type": "string"
}
},
"required": ["id", "command", "matchPattern"],
"additionalProperties": false
}
Given a resource type, the inventory is a list of items. Each item has a name, id, command and matchPattern.
command
is the CLI to run to get the inventory. matchPattern
is the pattern to use to split the output of the CLI into individual items.
Rules
Schema provided below:
"items": {
"type": "object",
"properties": {
"name": {
"type": "string"
},
"category": {
"type": "string"
},
"severity": {
"type": "string",
"enum": ["High", "Medium", "Low"]
},
"inventoryId": {
"type": "string"
},
"cmd": {
"type": "string"
},
"primary_input_name": {
"type": "string"
},
"secondary_input_name": {
"type": "string"
},
"pattern": {
"type": "string"
},
"diags": {
"type": "string"
}
},
"required": ["name", "category", "severity", "inventoryId", "cmd", "primary_input_name", "pattern"],
"additionalProperties": false
}
invetoryId
is the id of the inventory to use for this rule.
cmd
is the CLI to run for the individual item.
pattern
is the pattern to establish return value of the rule.
diags
is the CLI to run to get the diagnostics for the rule in case of failures.
primary_input_name
is the name of the primary input to the rule. This is the resource type that the rule is subscribed to. In the CLI, it will likely be found in double curly braces. E.g. kubectl describe node {{node}} -o json
We are finding that this model does not fit well with doing something like
kubectl get pods | grep -v Running
. Because we are not longer working with a single resource, but rather performing a scan. And in this case the system wants to perform the scan itself. Single rules are relegated to just looking at state pertaining to a single resource. Even with a more compliant model, likekubectl describe pod {pod} | grep CrashLoopBackOff
is turning out to be painful to code. This is because we don't have a good way of grepping.
Conclusion
Our model of having a check examine a singular object cannot scale if we use kubectl
. This means that we have to go with the native python approach. In order to get value out of the LLM models however, the rule should include the kubectl
command that its implementing to accomplish its task. ie what kubectl command would you type to get the same information that the rule is checking.
This leads us to an interesting approach: write the kubectl command for all the checks we are implementing, and use LLM to generate the code for the command. Ofcourse we need to review the generate the code, but this creates a good lock-step mechanism that we can.
The alternative would be to move away from a per object check into a check that examines all the objects of the given type.
Sample interaction with interactive python below
>>> s = """
... {
... "category": "Configuration",
... "cmd": "kubectl describe nodes {{node}}",
... "pattern": "grep 'cpu|memory|disk'",
... "primary_input_name": "node",
... "name": "Resource Limits",
... "severity": "Medium",
... "suite": "Generic"
... }
... """
>>>
>>> s
'\n {\n "category": "Configuration",\n "cmd": "kubectl describe nodes {{node}}",\n "pattern": "grep \'cpu|memory|disk\'",\n "primary_input_name": "node",\n "name": "Resource Limits",\n "severity": "Medium",\n "suite": "Generic"\n }\n'
>>> import json
>>> json.loads(s)
{'category': 'Configuration', 'cmd': 'kubectl describe nodes {{node}}', 'pattern': "grep 'cpu|memory|disk'", 'primary_input_name': 'node', 'name': 'Resource Limits', 'severity': 'Medium', 'suite': 'Generic'}
>>> r=json.loads(s)
>>> r.get('cmd')
'kubectl describe nodes {{node}}'
>>> r.get('primary_input_name')
'node'
>>> r.get('cmd').replace('{{' + r.get('primary_input_name') + '}}', 'n25')
'kubectl describe nodes n25'
>>>
The crawler can use something like this to fill up the template variables with the values from inventory.
There is some duplication right now with the double curly and the primary_input_name
. Lets see if it can be simplified.
{
"category": "Configuration",
"cmd": "kubectl get pods --field-selector spec.nodeName=<NODE_NAME> -o json",
"pattern": "Check .spec.hostNetwork is true",
"name": "Misconfigured HostNetwork",
"severity": "Medium",
"suite": "Nodes - pod properties and errors"
},
Simple Rules
Subscribe to a given resource inventory, perform a check on it using
- One or more CLIs to enrich
- match the outputs of the CLIs in the previous step against a set of patterns
- on match, collect a diagnostics output
- return a boolean, diagnostics output
This is where the expert system is getting created. A rule is a specific failure signature that can be detected by a set of patterns. The patterns are matched against the output of the CLIs. Therefore the human knowledge in this system is encoded in two places:
- The match patterns to detect the failure.
- The diagnostics to collect once the failure has been detected.
(2) will come in handy because we can implement an AI based backend that can examine the diagnostics and previde possible solutions.
Complex Rules
These rules process two types of resource types together and perform a check on them. E.g.
{
"category": "Storage",
"cmd": "kubectl get pv -o json",
"pattern": "Check .status.phase for PVs linked to the node",
"name": "Unbound PVs on Node",
"severity": "Medium",
"suite": "Nodes - other properties"
},
This rule is processing two resource types: Node
and PersistentVolume
. In such cases, the framework needs to provide the right combination of (Node, PV)
.
ie it should not be a NxM implementation, but rather use a filter to provide the right set of PV
for given Node
This filter should also be encoded in the rule.
This implementation can be simplified if we think in terms of a primary resource and a secondary resource. The rule is always applied on the primary resource, and the values of secondary resource are generated using a function with primary resource as input.
E.g. in the above case, the primary resource is Node
and the secondary resource is PV
. The function is getPVsForNode(Node)
.
This function can be implemented in the rule itself or can be specified in the main rule object. the benefit of specifying it in the main rule object is that the same function can be used by multiple rules. Also, the rule can just focus on the checks and not worry about how to get the secondary resource.
At this point, it feels like just having a primary and secondary input is enough and there is no need for a third input.
Crawler
Given an inventory and a set of rules (that subscribe to a given type of resource), run the rules on the inventory and return the results. The crawler also computes the correct set of inputs for complex rules as specified above by calling the generator function provided in the rule.
Formats
Picking this up from prowler
(unctl) ➜ prowler git:(master) cat providers/aws/services/eks/eks_endpoints_not_publicly_accessible/eks_endpoints_not_publicly_accessible.metadata.json
{
"Provider": "aws",
"CheckID": "eks_endpoints_not_publicly_accessible",
"CheckTitle": "Ensure EKS Clusters are created with Private Endpoint Enabled and Public Access Disabled",
"CheckType": [
"Protect",
"Secure network configuration",
"Resources not publicly accessible"
],
"ServiceName": "eks",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"Severity": "high",
"ResourceType": "AwsEksCluster",
"Description": "Ensure EKS Clusters are created with Private Endpoint Enabled and Public Access Disabled",
"Risk": "Publicly accessible services could expose sensitive data to bad actors.",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "aws eks update-cluster-config --region <region_name> --name <cluster_name> --resources-vpc-config endpointPublicAccess=false,endpointPrivateAccess=true,publicAccessCidrs=[\"123.123.123.123/32\"]",
"NativeIaC": "",
"Other": "https://github.com/cloudmatos/matos/tree/master/remediations/aws/eks/eks-disable-public-endpoint",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable private access to the Kubernetes API server so that all communication between your nodes and the API server stays within your VPC. Disable internet access to the API server.",
"Url": "https://docs.aws.amazon.com/eks/latest/userguide/infrastructure-security.html"
}
},
"Categories": [
"internet-exposed"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
All fields are applicable and we should be able to use this completely (see file lib/check/models.py)
class Check_Metadata_Model(BaseModel):
"""Check Metadata Model"""
Provider: str
CheckID: str
CheckTitle: str
CheckType: list[str]
ServiceName: str
SubServiceName: str
ResourceIdTemplate: str
Severity: str
ResourceType: str
Description: str
Risk: str
RelatedUrl: str
Remediation: Remediation
Categories: list[str]
DependsOn: list[str]
RelatedTo: list[str]
Notes: str
# We set the compliance to None to
# store the compliance later if supplied
Compliance: list = None
For each check the return object is described below
@dataclass
class Check_Report:
"""Contains the Check's finding information."""
status: str
status_extended: str
check_metadata: Check_Metadata_Model
resource_details: str
resource_tags: list
def __init__(self, metadata):
self.status = ""
self.check_metadata = Check_Metadata_Model.parse_raw(metadata)
self.status_extended = ""
self.resource_details = ""
self.resource_tags = []
@dataclass
class Check_Report_AWS(Check_Report):
"""Contains the AWS Check's finding information."""
resource_id: str
resource_arn: str
region: str
def __init__(self, metadata):
super().__init__(metadata)
self.resource_id = ""
self.resource_arn = ""
self.region = ""
The execution model is per check. ie the check is expanding the inventory and running the check on each item in the inventory. The inventory has been cached by the service framework
class ec2_instance_public_ip(Check):
def execute(self):
findings = []
for instance in ec2_client.instances:
if instance.state != "terminated":
report = Check_Report_AWS(self.metadata())
report.region = instance.region
report.resource_arn = instance.arn
report.resource_tags = instance.tags
report.status = "PASS"
report.status_extended = (
f"EC2 Instance {instance.id} does not have a Public IP."
)
report.resource_id = instance.id
if instance.public_ip:
report.status = "FAIL"
report.status_extended = f"EC2 Instance {instance.id} has a Public IP: {instance.public_ip} ({instance.public_dns})."
report.resource_id = instance.id
findings.append(report)
return findings
Suggested file system layout
k8s-checks/
│
├── bin/
│ ├── k8s_checker.py # Main entry script if needed
│
├── k8s_checks/
│ ├── __init__.py
│ ├── core/
│ │ ├── __init__.py
│ │ ├── executor.py # Code to execute kubectl commands
│ │ ├── validator.py # Schema validation logic
│ │ └── utils.py # Utility functions
│ │
│ ├── inventories/
│ │ ├── __init__.py
│ │ ├── node_inventory.py
│ │ ├── pod_inventory.py
│ │ └── ...
│ │
│ ├── rules/
│ │ ├── __init__.py
│ │ ├── node_rules.py
│ │ ├── pod_rules.py
│ │ └── ...
│ │
│ └── tests/
│ ├── __init__.py
│ ├── test_executor.py
│ ├── test_validator.py
│ └── ...
│
├── schemas/
│ ├── inventory_schema.json
│ └── rules_schema.json
│
├── logs/ # If you're logging results or errors
│
├── README.md
├── requirements.txt # Python dependencies
└── setup.py # If you intend to package and distribute it
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.