Skip to main content

Aribot Security Platform SDK by Aristiun & Ayurak - Threat modeling, compliance, and cloud security APIs

Project description

Aribot Python SDK

Official Python SDK for the Aribot Security Platform by Aristiun & Ayurak.

PyPI Python License

Installation

pip install aribot

Quick Start

from aribot import Aribot

client = Aribot(api_key="your_api_key")

# Analyze architecture diagram for threats
result = client.threat_modeling.analyze_diagram("architecture.png")
print(f"Found {result['threat_count']} threats")

# Get detailed threats
threats = client.threat_modeling.get_threats(result['diagram_id'])
for threat in threats:
    print(f"[{threat['severity']}] {threat['title']}")

# Digital Twin - List cloud providers
providers = client.digital_twin.get_providers()
for p in providers:
    print(f"Provider: {p['name']} - {p['display_name']}")

# Economics - Get dashboard
dashboard = client.economics.get_dashboard()
print(f"Total Monthly Cost: ${dashboard['company_summary']['total_monthly']}")

# Red Team - Get methodologies
methodologies = client.redteam.get_methodologies()
for m in methodologies:
    print(f"Methodology: {m['name']}")

Features

  • Threat Modeling - Upload diagrams, detect components, identify threats
  • Compliance Scanning - ISO 27001, SOC2, GDPR, HIPAA, PCI-DSS, NIST
  • Cloud Security - Scan AWS, Azure, GCP for misconfigurations
  • Pipeline Security - SAST, SCA, secrets detection in CI/CD
  • Digital Twin - Cloud provider integration, resource discovery, health monitoring
  • Economics - Cost analysis, ROI calculations, market intelligence
  • Red Team - Attack simulations, methodologies, threat intelligence

API Reference

Threat Modeling

# Upload and analyze a diagram
result = client.threat_modeling.analyze_diagram(
    "architecture.png",
    analysis_depth="comprehensive",  # basic, comprehensive, detailed
    wait=True,                       # wait for analysis to complete
    timeout=300                      # max wait time in seconds
)

# List diagrams
diagrams = client.threat_modeling.list(page=1, limit=25)

# Get diagram details
diagram = client.threat_modeling.get(diagram_id)

# Get threats for a diagram
threats = client.threat_modeling.get_threats(diagram_id)

# Get detected components
components = client.threat_modeling.get_components(diagram_id)

# Run AI-powered analysis
ai_result = client.threat_modeling.analyze_with_ai(
    diagram_id,
    analysis_types=["attack_paths", "data_flow"]
)

# Delete a diagram
client.threat_modeling.delete(diagram_id)

# Get dashboard metrics
dashboard = client.threat_modeling.dashboard(period="month")

Compliance Scanning

# Run compliance scan
result = client.compliance.scan(
    diagram_id,
    standards=["ISO27001", "SOC2", "GDPR"],
    include_recommendations=True
)
print(f"Compliance score: {result['overall_score']}%")

# Get compliance report
report = client.compliance.get_report(diagram_id, format="json")

# List available standards
standards = client.compliance.list_standards()

# Get standard details
iso = client.compliance.get_standard("ISO27001")

# List controls for a standard
controls = client.compliance.list_controls("SOC2", category="access_control")

# Get compliance gaps
gaps = client.compliance.get_gaps(diagram_id, standard_id="ISO27001")

# Create custom standard
custom = client.compliance.add_custom_standard(
    name="Internal Security Policy",
    description="Company security requirements",
    controls=[
        {
            "id": "ISP-001",
            "name": "Data Encryption",
            "description": "All data must be encrypted at rest",
            "severity": "high"
        }
    ]
)

# Get compliance dashboard
dashboard = client.compliance.dashboard(period="quarter")

Cloud Security

# Run cloud security scan
scan = client.cloud.scan(
    project_id="123456789012",
    provider="aws",
    services=["iam", "s3", "ec2"],
    compliance_standards=["CIS-AWS"]
)

# Get scan results
scan = client.cloud.get_scan(scan_id)

# List scans
scans = client.cloud.list_scans(provider="aws", status="completed")

# Get findings
findings = client.cloud.get_findings(
    scan_id,
    severity="critical",
    service="s3"
)

# Connect AWS account
account = client.cloud.connect_account(
    provider="aws",
    credentials={
        "role_arn": "arn:aws:iam::123456789012:role/AribotSecurityRole",
        "external_id": "your-external-id"
    },
    name="Production AWS"
)

# Connect GCP project
account = client.cloud.connect_account(
    provider="gcp",
    credentials={
        "service_account_key": "{ ... }",
        "project_id": "my-project-123"
    }
)

# Connect Azure subscription
account = client.cloud.connect_account(
    provider="azure",
    credentials={
        "tenant_id": "...",
        "client_id": "...",
        "client_secret": "..."
    }
)

# List connected accounts
accounts = client.cloud.list_accounts(provider="aws")

# Get remediation steps
remediation = client.cloud.get_remediation(finding_id)

# Resolve a finding
client.cloud.resolve_finding(
    finding_id,
    resolution="fixed",
    notes="Patched in deployment v1.2.3"
)

# Suppress a finding
client.cloud.suppress_finding(
    finding_id,
    reason="Accepted risk per security review",
    duration_days=90
)

# Get cloud security dashboard
dashboard = client.cloud.dashboard(project_id="123456789012")

Pipeline Security

# Create a project
project = client.pipeline.create_project(
    name="my-api",
    repository_url="https://github.com/org/my-api",
    scan_types=["sast", "sca", "secrets"]
)

# Run security scan
result = client.pipeline.scan(
    project_id,
    commit_sha="abc123def456",
    branch="main",
    scan_types=["sast", "sca", "secrets"],
    fail_on_severity="high",
    wait=True
)

if result['status'] == 'failed':
    print("Security gate failed!")
    for finding in result['blocking_findings']:
        print(f"  [{finding['severity']}] {finding['title']}")

# Get scan details
scan = client.pipeline.get_scan(scan_id)

# Get specific finding types
sast_findings = client.pipeline.get_sast_findings(scan_id)
sca_findings = client.pipeline.get_sca_findings(scan_id)
secrets = client.pipeline.get_secrets_findings(scan_id)

# Configure security gates
client.pipeline.configure_gates(
    project_id,
    gates={
        "fail_on_critical": True,
        "fail_on_high": True,
        "max_high_findings": 5,
        "block_secrets": True,
        "required_scan_types": ["sast", "secrets"]
    }
)

# Set baseline (suppress existing findings)
client.pipeline.add_baseline(project_id, scan_id)

# Suppress a finding
client.pipeline.suppress_finding(
    finding_id,
    reason="False positive - validated manually"
)

# Get pipeline dashboard
dashboard = client.pipeline.dashboard(project_id=project_id)

Digital Twin

# Get cloud providers
providers = client.digital_twin.get_providers()
# Returns: [{'id': '...', 'name': 'aws', 'display_name': 'Amazon Web Services', 'connected': False}, ...]

# Get provider health
health = client.digital_twin.get_health()
# Returns: {'status': 'ok', 'configured': True, 'infrastructure': {...}, 'capabilities': [...]}

# Get analytics
analytics = client.digital_twin.get_analytics()
# Returns: {'configured': True, 'graph_statistics': {...}, 'entities_by_type': {...}}

# Get resources
resources = client.digital_twin.get_resources(limit=50)
# Returns: [{'id': '...', 'name': 'my-bucket', 'provider': 'aws', 'resource_type': 's3'}, ...]

# Sync resources from provider
result = client.digital_twin.sync_resources(provider_id='aws-123')
# Returns: {'status': 'syncing', 'resources_found': 150}

# Discover new resources
discovery = client.digital_twin.discover_resources(provider_id='aws-123')
# Returns: {'status': 'discovered', 'new_resources': 25}

Economics

# Get economics dashboard
dashboard = client.economics.get_dashboard(period='month')
# Returns: {'success': True, 'company_summary': {'total_monthly': 5000, 'total_annual': 60000}, ...}

# Get diagram cost analysis
cost = client.economics.get_diagram_cost_analysis(diagram_id)
# Returns: {'monthly_cost': 1500, 'annual_cost': 18000, 'cost_breakdown': [...]}

# Get component cost
component_cost = client.economics.get_component_cost(component_id)
# Returns: {'component': 'EC2 Instance', 'monthly_cost': 200, 'recommendations': [...]}

# Get economic intelligence (includes Strategic Cost Optimization)
intel = client.economics.get_economic_intelligence()
# Returns: {'status': 'success', 'provider': 'aws', 'pricing': {...},
#           'strategic_optimization': {'current': {...}, 'previous': {...}}}

# Refresh Strategic Cost Optimization recommendations (AI-powered)
# Moves current recommendations to "previous" and generates new ones
refresh = client.economics.refresh_recommendations()
# Returns: {'status': 'success', 'message': 'Recommendations refreshed',
#           'current': {...}, 'previous': {...}}

# Get market intelligence
market = client.economics.get_market_intelligence()
# Returns: {'trends': [...], 'benchmarks': {...}, 'recommendations': [...]}

# Calculate ROI
roi = client.economics.calculate_roi(
    investment=100000,
    risks_addressed=['risk-1', 'risk-2'],
    timeframe_days=365
)
# Returns: {'roi_percentage': 250, 'npv': 150000, 'payback_months': 8}

Red Team

# Get threat modeling methodologies
methodologies = client.redteam.get_methodologies()
# Returns: [{'id': 'stride', 'name': 'STRIDE', 'description': '...'}, ...]

# Get simulations
simulations = client.redteam.get_simulations(limit=10)
# Returns: [{'id': '...', 'name': 'APT29 Simulation', 'status': 'completed'}, ...]

# Get threat intelligence
intel = client.redteam.get_intelligence()
# Returns: {'threats': [...], 'indicators': [...], 'campaigns': [...]}

# Generate attack paths for a diagram
paths = client.redteam.generate_attack_paths(
    diagram_id,
    scope='single',
    include_compliance=True
)
# Returns: {'status': 'success', 'paths': [{'title': '...', 'risk_score': 85, 'steps': [...]}]}

# Get security requirements
requirements = client.redteam.get_security_requirements(diagram_id)
# Returns: [{'id': '...', 'requirement': '...', 'priority': 'high'}, ...]

Severity Assignment (AI-powered)

# Estimate AI cost for severity assignment
estimate = client.compliance.estimate_severity_cost(
    scan_id=scan_id,
    account_id=account_id
)
# Returns: {'estimated_tokens': 15000, 'estimated_cost_usd': 0.45, 'violations_count': 150}

# Assign severity using AI (automatically analyzes all violations)
result = client.compliance.assign_severity_ai(
    scan_id=scan_id,
    account_id=account_id,
    model='claude-3-sonnet',  # or 'gpt-4', 'gemini-pro'
    batch_size=50
)
# Returns: {'status': 'completed', 'processed': 150, 'updated': 142, 'errors': 0}

# Manually assign severity to violations
result = client.compliance.assign_severity_manual(
    violation_ids=['v-123', 'v-456'],
    severity='high'  # critical, high, medium, low, info
)
# Returns: {'updated': 2, 'violations': [...]}

Scanner Rules

# List scanner rules
rules = client.compliance.list_scanner_rules(
    severity='critical',
    provider='aws'
)
# Returns: [{'id': '...', 'name': 'S3 Public Access', 'severity': 'critical'}, ...]

# Get scanner rule statistics
stats = client.compliance.get_scanner_statistics()
# Returns: {'total_rules': 500, 'by_severity': {...}, 'by_provider': {...}}

# Sync rules from cloud providers
sync_result = client.compliance.sync_scanner_rules(
    providers=['aws', 'azure', 'gcp']
)
# Returns: {'synced': 150, 'new': 25, 'updated': 10}

# Create custom scanner rule
rule = client.compliance.create_scanner_rule(
    name='Custom S3 Encryption Check',
    description='Ensure all S3 buckets have encryption enabled',
    severity='high',
    provider='aws',
    resource_type='s3_bucket',
    condition={
        'field': 'encryption.enabled',
        'operator': 'equals',
        'value': True
    }
)

Dynamic Cloud Scanning

# Execute dynamic scan on cloud account
scan = client.compliance.execute_dynamic_scan(
    account_id=account_id,
    scan_type='full',  # full, quick, targeted
    resources=['ec2', 's3', 'iam'],
    standards=['CIS-AWS', 'SOC2']
)
# Returns: {'scan_id': '...', 'status': 'running', 'estimated_duration': 300}

# Execute unified scan with flexible scope
scan = client.compliance.execute_unified_scan(
    scope='account',  # account, standard, diagram
    scope_id=account_id,
    include_remediation=True
)
# Returns: {'scan_id': '...', 'status': 'queued'}

Remediation Execution

# Preview remediation before execution
preview = client.compliance.preview_remediation(
    policy_id=policy_id,
    account_id=account_id
)
# Returns: {'actions': [...], 'risk_level': 'low', 'affected_resources': 5}

# Execute remediation
result = client.compliance.execute_remediation(
    policy_id=policy_id,
    account_id=account_id,
    dry_run=False,
    auto_approve=True
)
# Returns: {'status': 'completed', 'resources_fixed': 5, 'rollback_available': True}

AI Agents & Self-Healing

# Get AI agent status
status = client.ai_agents.get_status()
# Returns: {'active': True, 'agents': [...], 'capabilities': [...]}

# Run specialist agent analysis
analysis = client.ai_agents.run_specialist(
    agent_type='security',  # security, compliance, cost, architecture
    diagram_id=diagram_id
)

# Self-healing operations
healing_status = client.ai_agents.get_self_healing_status()
# Returns: {'enabled': True, 'recent_actions': [...], 'autonomy_level': 'supervised'}

# Get autonomy stats
stats = client.ai_agents.get_autonomy_stats()
# Returns: {'total_remediations': 150, 'auto_approved': 120, 'manual_review': 30}

# Trigger remediation
remediation = client.ai_agents.trigger_remediation(
    finding_id=finding_id,
    auto_approve=False
)

# Approve/reject remediation action
client.ai_agents.approve_remediation(remediation_id)
client.ai_agents.reject_remediation(remediation_id, reason='Risk too high')

# Rollback remediation
client.ai_agents.rollback_remediation(remediation_id)

# Emergency stop all autonomous actions
client.ai_agents.emergency_stop()

# Resume autonomous operations
client.ai_agents.resume_operations()

Security Co-Pilot

# Get security co-pilot status
status = client.security_copilot.get_status()
# Returns: {'enabled': True, 'mode': 'supervised', 'active_threats': 5}

# Get pending actions awaiting approval
actions = client.security_copilot.get_pending_actions()
# Returns: [{'id': '...', 'type': 'patch', 'resource': '...', 'risk': 'low'}, ...]

# Get action history
history = client.security_copilot.get_action_history(limit=50)

# Get active threats
threats = client.security_copilot.get_active_threats()

# Approve/reject action
client.security_copilot.approve_action(action_id)
client.security_copilot.reject_action(action_id)

# Rollback action
client.security_copilot.rollback_action(action_id)

# Trigger security scan
scan = client.security_copilot.trigger_scan(scope='full')

# Update settings
client.security_copilot.update_settings(
    auto_remediation=True,
    max_risk_level='medium'
)

# Toggle co-pilot on/off
client.security_copilot.toggle(enabled=True)

Presets Import

# Import compliance presets
presets = client.compliance.import_presets(
    provider='aws',  # aws, azure, gcp, all
    categories=['security', 'cost', 'operations']
)
# Returns: {'imported': 50, 'standards': [...], 'rules': [...]}

Strategic Remediation Plan (AI-powered)

# Generate strategic remediation plan for a severity level
# Uses AI to analyze all violations and create a comprehensive plan
plan = client.compliance.generate_strategic_plan(
    scan_id=scan_id,
    account_id=account_id,
    severity='critical'  # critical, high, medium, low
)
# Returns: {
#   'id': 'uuid',
#   'severity': 'critical',
#   'status': 'generated',
#   'overview': 'Strategic overview of all critical violations...',
#   'root_causes': [{'cause': '...', 'theme': '...', 'impact': '...'}],
#   'grouped_violations': [{'policy': '...', 'violations': [...]}],
#   'high_impact_actions': [{'action': '...', 'impact_score': 95}],
#   'remediation_phases': [{'phase': 1, 'duration': '1 week', 'actions': [...]}],
#   'success_metrics': [{'metric': '...', 'target': '...'}],
#   'estimated_total_effort': '2-3 weeks',
#   'cost_savings_finops': {'monthly': 5000, 'annual': 60000},
#   'cost_savings_risk': {'risk_reduction': '85%', 'avoided_incidents': 12}
# }

# Get existing strategic plan
existing = client.compliance.get_strategic_plan(plan_id)

# List strategic plans for an account
plans = client.compliance.list_strategic_plans(
    account_id=account_id,
    severity='critical'
)

Mitigation Plan (AI-powered)

# Get existing mitigation plan for a diagram
plan = client.threat_modeling.get_mitigation_plan(diagram_id)
if plan:
    print(f"Overview: {plan['plan']['overview']}")
    print(f"Recommendations: {len(plan['plan']['recommendations'])}")
    for rec in plan['plan']['recommendations']:
        print(f"  [{rec['rank']}] {rec['title']} - {rec['priority']}")

# Generate new mitigation plan (AI-powered, uses chunked processing)
plan = client.threat_modeling.generate_mitigation_plan(
    diagram_id,
    force_regenerate=True  # Force regenerate even if cached
)
# Returns comprehensive plan with:
# - overview: Strategic summary of all threats
# - recommendations: Ranked list with code snippets
# - root_causes: Identified patterns across threats
# - remediation_phases: Phased approach with timelines
# - success_metrics: Measurable criteria
# - metadata: AI provider, generation time, etc.

print(f"Generated in {plan['plan']['metadata']['generation_time_ms']}ms")
print(f"Threats analyzed: {plan['plan']['metadata']['threat_count']}")

# Access recommendations with code snippets
for rec in plan['plan']['recommendations']:
    print(f"[{rec['priority']}] {rec['title']}")
    print(f"  Impact: {rec['impact']}")
    print(f"  Effort: {rec['effort']}")
    print(f"  Affected threats: {', '.join(rec.get('affected_threats', []))}")
    if rec.get('code_snippet'):
        print(f"  Code ({rec['code_language']}):")
        print(f"    {rec['code_snippet']}")

# View root causes (patterns identified across multiple threats)
for cause in plan['plan']['root_causes']:
    print(f"Root cause: {cause['cause']}")
    print(f"  Theme: {cause['theme']}, Impact: {cause['impact']}")

# View phased remediation plan
for phase in plan['plan']['remediation_phases']:
    print(f"Phase: {phase['phase']} ({phase['duration']})")
    print(f"  Actions: {', '.join(phase['actions'])}")
    print(f"  Threats resolved: {phase['threats_resolved']}")

# Update plan with manual edits
updated_plan = plan['plan'].copy()
updated_plan['recommendations'][0]['priority'] = 'immediate'
client.threat_modeling.update_mitigation_plan(diagram_id, updated_plan)

Error Handling

from aribot import (
    Aribot,
    AribotError,
    AuthenticationError,
    RateLimitError,
    ValidationError,
    NotFoundError,
    ServerError
)

client = Aribot(api_key="your_api_key")

try:
    result = client.threat_modeling.analyze_diagram("diagram.png")
except AuthenticationError:
    print("Invalid API key")
except RateLimitError as e:
    print(f"Rate limited. Retry after {e.retry_after} seconds")
except ValidationError as e:
    print(f"Invalid request: {e.errors}")
except NotFoundError:
    print("Resource not found")
except ServerError:
    print("Server error - try again later")
except AribotError as e:
    print(f"API error: {e.message}")

Configuration

# Custom base URL (for on-premise deployments)
client = Aribot(
    api_key="your_api_key",
    base_url="https://aribot.internal.company.com/api",
    timeout=60
)

# Check API health
health = client.health()

# Get current user info
user = client.me()

# Get usage stats
usage = client.usage(period="month")
print(f"API calls used: {usage['calls_used']}/{usage['calls_limit']}")

CI/CD Integration

GitHub Actions

- name: Security Scan
  env:
    AYURAK_API_KEY: ${{ secrets.AYURAK_API_KEY }}
  run: |
    pip install aribot
    python -c "
    from aribot import Aribot
    client = Aribot(api_key='$AYURAK_API_KEY')
    result = client.pipeline.scan(
        project_id='${{ vars.PROJECT_ID }}',
        commit_sha='${{ github.sha }}',
        fail_on_severity='high',
        wait=True
    )
    if result['status'] == 'failed':
        exit(1)
    "

GitLab CI

security_scan:
  script:
    - pip install aribot
    - python scripts/security_scan.py
  variables:
    AYURAK_API_KEY: $AYURAK_API_KEY

Support

Changelog

v1.5.0

  • Added Strategic Remediation Plan API (client.compliance.generate_strategic_plan, get_strategic_plan, list_strategic_plans) - AI-powered strategic planning for compliance violations with cost savings analysis
  • Added Economic Intelligence Refresh (client.economics.refresh_recommendations) - Refresh AI-powered cost optimization recommendations
  • Added Severity Assignment API (client.compliance.estimate_severity_cost, assign_severity_ai, assign_severity_manual)
  • Added Scanner Rules API (client.compliance.list_scanner_rules, get_scanner_statistics, sync_scanner_rules, create_scanner_rule)
  • Added Dynamic Cloud Scanning (client.compliance.execute_dynamic_scan, execute_unified_scan)
  • Added Remediation Execution (client.compliance.preview_remediation, execute_remediation)
  • Added AI Agents & Self-Healing module (client.ai_agents) with autonomous remediation
  • Added Security Co-Pilot module (client.security_copilot) for supervised security operations
  • Added Presets Import (client.compliance.import_presets)
  • Added Mitigation Plan API (client.threat_modeling.get_mitigation_plan, generate_mitigation_plan, update_mitigation_plan) - AI-powered strategic remediation planning with chunked processing for large threat sets

v1.4.0

  • Updated base URL to api.aribot.ayurak.com
  • Added AI module (client.ai) - usage, quota, models, configure, analyze, queue status
  • Added SBOM module (client.sbom) - document management and vulnerability scanning
  • Added Dashboard module (client.dashboard) - overview, recent activity, risk summary
  • Added FinOps module (client.finops) - cost optimization recommendations and tracking
  • Added Marketplace module (client.marketplace) - templates, categories, featured content
  • Added API Keys module (client.api_keys) - key listing and revocation

v1.1.0

  • Added Digital Twin, Economics, and Red Team modules
  • Added Remediation module

v1.0.0

  • Initial release with Threat Modeling, Compliance, Cloud Security, and Pipeline modules

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aribot-2.0.1.tar.gz (25.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

aribot-2.0.1-py3-none-any.whl (35.4 kB view details)

Uploaded Python 3

File details

Details for the file aribot-2.0.1.tar.gz.

File metadata

  • Download URL: aribot-2.0.1.tar.gz
  • Upload date:
  • Size: 25.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.0

File hashes

Hashes for aribot-2.0.1.tar.gz
Algorithm Hash digest
SHA256 d7c563a008f7d5f30421d13c1d6608517a47aebb1628670fd290d73819c95499
MD5 f16df921e2b04e9d7d021ae218e189ba
BLAKE2b-256 31555a2c78fb655c6024b5cdd6ec8de2f39f382df9b09c7c1b87a0756a574ba5

See more details on using hashes here.

File details

Details for the file aribot-2.0.1-py3-none-any.whl.

File metadata

  • Download URL: aribot-2.0.1-py3-none-any.whl
  • Upload date:
  • Size: 35.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.0

File hashes

Hashes for aribot-2.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 bcd3234c842485eba68e44fe1b8f0775ee23516df395a7515b12c8b59a8e5da8
MD5 c326012b9cdbfd3b0084b5dd6069e361
BLAKE2b-256 e3b544f0387cb3f068653fe23031f458c7a23a7698939bc2406140f055519592

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page