Skip to main content

No project description provided

Project description

Project description CyClarity SDK

Introduction

CyClarity SDK is a Python package that provides an interface for interacting with the CyClarity platform. It includes classes and methods for creating, managing, and interacting with various resources on the platform.

Installation

You can install the CyClarity SDK using pip:

pip install cyclarity-sdk

Usage

Here are examples of how to use the classes in the CyClarity SDK. Runnable The Runnable class is a base class for creating objects that can be run with setup and teardown phases. It has setup, run, and teardown methods that need to be implemented. Here is an example:

from cyclarity_sdk.runnable import Runnable, BaseResultsModel

class MyResult(BaseResultsModel):
res_str: str

class MyRunnable(Runnable[MyResult]):
desc: str
cli_args: str
#self.platform_api: PlatformApi = PlatformApi() is being initiated in a different stack.
def setup(self):  
    self.logger.info("Setting up")  

def run(self):  
    self.logger.info("Running")  
    self.platform_api.send_test_report_description("This is a test description")  
    return MyResult(res_str="success!")  

def teardown(self, exception_type, exception_value, traceback):  
    self.logger.info("Tearing down")  
To use this Runnable, you would do:

input = {
"desc": "test",
"cli_args": "-as -fsd -dsd",
}

with MyRunnable(**input) as runnable_instance: # noqa
result: MyResult = runnable_instance()

# generates result json object  
print("\ndictionary running results: ")  
print(result.model_dump_json())  
PlatformApi

The PlatformApi class provides methods for interacting with the CyClarity platform. It is used within a Runnable instance through self.platform_api attribute. Here are the methods it supports:

send_test_report_description(description): This method sends a test report description to the platform. send_finding(pt_finding): This method sends a finding to the platform. send_execution_state(percentage, status, error_message): This method sends the execution state to the platform.

Here is how you might use PlatformApi in a Runnable:

class MyRunnable(Runnable[MyResult]):
desc: str
cli_args: str

def setup(self):  
    self.logger.info("Setting up")  

def run(self):          self.logger.info("Running")  
    self.platform_api.send_test_report_description("This is a test description")  
    return MyResult(res_str="success!")  

def teardown(self, exception_type, exception_value, traceback):  
    self.logger.info("Tearing down")  


PlatformApi
The PlatformApi class provides methods for interacting with the CyClarity platform. It is used within a Runnable instance through the self.platform_api attribute.

from cyclarity_sdk.platform_api.Iplatform_connector import IPlatformConnectorApi
from cyclarity_sdk.platform_api.connectors.cli_connector import CliConnector
from cyclarity_sdk.sdk_models.artifacts import TestArtifact, TestReportDescription, ArtifactType
from cyclarity_sdk.sdk_models.findings import Finding, PTFinding, FindingModelType
from cyclarity_sdk.sdk_models import ExecutionState, ExecutionStatus
from clarity_common.models.common_models.models import MessageType
from typing import Optional

class PlatformApi:
    def __init__(self, platform_connector: Optional[IPlatformConnectorApi] = None):
        if not platform_connector:
            platform_connector = CliConnector()
        self.set_api(platform_connector)

    def set_api(self, platform_api: IPlatformConnectorApi):
        self.platform_connector = platform_api

    def send_test_report_description(self, description: str):
        execution_metadata = self.platform_connector.get_execution_meta_data()

        test_report_description = TestReportDescription(
            type=ArtifactType.REPORT_DESCRIPTION,
            description=description
        )

        artifact = TestArtifact(
            execution_metadata=execution_metadata,
            type=MessageType.TEST_ARTIFACT,
            data=test_report_description
        )

        return self.platform_connector.send_artifact(artifact)

    def send_finding(self, pt_finding: PTFinding):
        execution_metadata = self.platform_connector.get_execution_meta_data()
        
        finding = Finding(
            metadata=execution_metadata,
            model_type=FindingModelType.PT_FINDING,
            data=pt_finding,
            type=MessageType.FINDING
        )

        return self.platform_connector.send_finding(finding)

    def send_execution_state(self, percentage: int, status: ExecutionStatus, error_message: str = None):
        execution_metadata = self.platform_connector.get_execution_meta_data()

        execution_state = ExecutionState(
            execution_metadata=execution_metadata,
            percentage=percentage,
            status=status,
            error_message=error_message
        )

        return self.platform_connector.send_state(execution_state)
In the
PlatformApi
class, the
execution_metadata
is fetched using the
get_execution_meta_data()
method of the
platform_connector

### Additional Classes Used by PlatformApi

The `PlatformApi` class utilizes several additional classes for its functionality, which are imported from various modules. Here are these classes along with their definitions:

```python
from cyclarity_sdk.platform_api.Iplatform_connector import IPlatformConnectorApi
from cyclarity_sdk.platform_api.connectors.cli_connector import CliConnector
from cyclarity_sdk.sdk_models.artifacts import TestArtifact, TestReportDescription, ArtifactType
from cyclarity_sdk.sdk_models.findings import Finding, PTFinding, FindingModelType
from cyclarity_sdk.sdk_models import ExecutionState, ExecutionStatus
from clarity_common.models.common_models.models import MessageType
from typing import Optional

class IPlatformConnectorApi(ABC):
    @abstractmethod
    def send_artifact(self, test_artifact: TestArtifact):
        raise NotImplementedError(
            f'send_artifact was not defined for class {self.__class__.__name__}')  # noqa

    @abstractmethod
    def send_finding(self, finding: Finding):
        raise NotImplementedError(
            f'send_finding was not defined for class {self.__class__.__name__}')  # noqa

    @abstractmethod
    def send_state(self, execution_state: ExecutionState):
        raise NotImplementedError(
            f'send_state was not defined for class {self.__class__.__name__}')  # noqa

    @abstractmethod
    def get_execution_meta_data(self) -> ExecutionMetadata:
        raise NotImplementedError(
            f'send_state was not defined for class {self.__class__.__name__}')  # noqa

class TestArtifact(BaseModel):
    execution_metadata: ExecutionMetadata
    type: MessageType = MessageType.TEST_ARTIFACT
    data: Union[TestReportDescription, ScanGraph]

class TestReportDescription(BaseModel):
    type: ArtifactType
    description: str

class ArtifactType(str, Enum):
    ''' ____WARNING____'''
    '''Please note that changing this strings can effect visualization of
      results in the reports because we query artifact table base on those and
      they kept in the artifacts table in the subtype attribute as strings '''

    REPORT_DESCRIPTION = "report_description"
    SCAN_GRAPH = "scan_graph"

class Finding(BaseModel):
    metadata: ExecutionMetadata
    model_type: FindingModelType
    data: PTFinding
    type: MessageType = MessageType.FINDING

    @computed_field
    @property
    def subtype(self) -> FindingType:
        return self.data.type

class PTFinding(BaseModel):
    topic: str = Field(description="Subject")
    status: FindingStatus = Field(description="status of the finding")
    type: FindingType = Field(description="The type of the finding")
    assessment_category: AssessmentCategory = Field(AssessmentCategory.PENTEST, description="assessment category")  # noqa
    assessment_technique: AssessmentTechnique = Field(AssessmentTechnique.NETWORK_ANALYSIS, description="assessment technique")  # noqa
    purpose: str = Field(description="purpose of the test")
    description: str = Field(description="description")
    preconditions: Optional[str] = Field(None, description="precondition for the test")  # noqa
    steps: Optional[str] = Field(None, description="steps performed for executing the test")  # noqa
    threat: Optional[str] = Field(None, description="threat description")
    recommendations: Optional[str] = Field(None, description="recommendations")
    expertise: Optional[Expertise] = Field(None, description="expertise needed by the attack in order to manipulate it")  # noqa
    access: Optional[Access] = Field(None, description="access needed in order to perform this attack")  # noqa
    time: Optional[ElapsedTime] = Field(None, description="the estimated time it takes to execute the exploit")  # noqa
    equipment: Optional[Equipment] = Field(None, description="required equipment level needed in order to execute the exploit")  # noqa
    knowledge_of_target: Optional[KnowledgeOfTarget] = Field(None, description="")  # noqa
    cwe_number: Optional[int] = Field(None, description="cwe num")

    # Custom validator that checks if different fields are matching 'RiskModel'
    @field_validator('expertise', 'access', 'time', 'equipment',
                     'knowledge_of_target', mode="before")
    def convert_enum_attributes_to_model(cls, v, info):
        """
        Convert enums values to pydantic model
        """
        field_to_enum_mapping = {
            'expertise': Expertise,
            'access': Access,
            'time': ElapsedTime,
            'equipment': Equipment,
            'knowledge_of_target': KnowledgeOfTarget
        }
        enum_class = field_to_enum_mapping.get(info.field_name)
        if not enum_class:
            raise ValueError(f"No enum class found for field "
                             f"{info.field_name}")
        if isinstance(v, dict):
            # Cover the case where the information is already a dict.
            return RiskModel(**v)
        if isinstance(v, str):
            try:
                return getattr(enum_class, v)
            except AttributeError:
                raise ValueError(f"{v} is not a valid value for enum class"
                                 f" {enum_class} and field {info.field_name}")
        return v

    @computed_field
    @property
    def cwe_description(self) -> str:
        try:
            cwe_db = CWEDatabase()
            weakness = cwe_db.get(self.cwe_number)
            return weakness.description
        except Exception:
            return 'N.A'  # not available

    @computed_field
    @property
    def sum(self) -> int:
        risk_sum = 0
        for field_name, field_value in self:
            if isinstance(field_value, Enum) and isinstance(
                    field_value.value, RiskModel):
                risk_sum += field_value.value.risk_value
        return risk_sum

    @computed_field
    @property
    def attack_difficulty(self) -> str:
        if self.type != FindingType.FINDING:
            return "None"
        elif self.sum < 14:
            return "Very Low"
        elif self.sum < 20:
            return "Low"
        elif self.sum < 25:
            return "Moderate"
        elif self.sum < 35:
            return "High"
        return "Very High"


class Finding(BaseModel):
    metadata: ExecutionMetadata
    model_type: FindingModelType
    data: PTFinding
    type: MessageType = MessageType.FINDING

    @computed_field
    @property
    def subtype(self) -> FindingType:
        return self.data.type

class Finding(BaseModel):
    metadata: ExecutionMetadata
    model_type: FindingModelType
    data: PTFinding
    type: MessageType = MessageType.FINDING

    @computed_field
    @property
    def subtype(self) -> FindingType:
        return self.data.type

class FindingModelType(str, Enum):
    PT_FINDING = "pt_finding"

class ExecutionState(BaseModel):
    '''Data structure to be send via topic::execution-state'''
    execution_metadata: ExecutionMetadata
    percentage: int
    status: ExecutionStatus
    error_message: Optional[str]

class ExecutionStatus(str, Enum):
    TIMEOUT = "TIMEOUT"
    QUEUED = "QUEUED"
    RUNNING = "RUNNING"
    COMPLETED_SUCCESSFULLY = "COMPLETED"
    COMPLETED_WITH_ERROR = "FAILED"
    STOPPED = "STOPPED"

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cyclarity_sdk-1.0.54a2.tar.gz (16.2 kB view hashes)

Uploaded Source

Built Distribution

cyclarity_sdk-1.0.54a2-py3-none-any.whl (22.5 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page