Skip to main content

Klime SDK for Python

Project description

klime

Klime SDK for Python.

Installation

pip install klime

Quick Start

from klime import KlimeClient

client = KlimeClient(
    write_key='your-write-key'
)

# Identify a user
client.identify('user_123', {
    'email': 'user@example.com',
    'name': 'Stefan'
})

# Track an event
client.track('Button Clicked', {
    'button_name': 'Sign up',
    'plan': 'pro'
}, user_id='user_123')

# Associate user with a group and set group traits
client.group('org_456', {
    'name': 'Acme Inc',
    'plan': 'enterprise'
}, user_id='user_123')

# Or just link the user to a group (if traits are already set)
client.group('org_456', user_id='user_123')

# Shutdown gracefully
client.shutdown()

Installation Prompt

Copy and paste this prompt into Cursor, Copilot, or your favorite AI editor to integrate Klime:

Integrate Klime for customer analytics. Klime tracks user activity to identify which customers are healthy vs at risk of churning.

ANALYTICS MODES (determine which applies):
- Companies & Teams: Your customers are companies with multiple team members (SaaS, enterprise tools)
  → Use identify() + group() + track()
- Individual Customers: Your customers are individuals with private accounts (consumer apps, creator tools)
  → Use identify() + track() only (no group() needed)

KEY CONCEPTS:
- Every track() call requires either user_id OR group_id (no anonymous events)
- Use group_id alone for org-level events (webhooks, cron jobs, system metrics)
- group() links a user to a company AND sets company traits (only for Companies & Teams mode)
- Order doesn't matter - events before identify/group still get attributed correctly

BEST PRACTICES:
- Initialize client ONCE at app startup (singleton pattern)
- Store write key in KLIME_WRITE_KEY environment variable
- Call shutdown() on process exit to flush remaining events (auto-registered via atexit)

Install: pip install klime

import os
from klime import KlimeClient

client = KlimeClient(write_key=os.environ['KLIME_WRITE_KEY'])

# Identify users at signup/login:
client.identify('usr_abc123', {'email': 'jane@acme.com', 'name': 'Jane Smith'})

# Track key activities:
client.track('Report Generated', {'report_type': 'revenue'}, user_id='usr_abc123')
client.track('Feature Used', {'feature': 'export', 'format': 'csv'}, user_id='usr_abc123')
client.track('Teammate Invited', {'role': 'member'}, user_id='usr_abc123')

# If Companies & Teams mode: link user to their company and set company traits
client.group('org_456', {'name': 'Acme Inc', 'plan': 'enterprise'}, user_id='usr_abc123')

INTEGRATION WORKFLOW:

Phase 1: Discover
Explore the codebase to understand:
1. What framework is used? (Django, Flask, FastAPI, Starlette, etc.)
2. Where is user identity available? (e.g., request.user.id, current_user.id, g.user, Depends() injection)
3. Is this Companies & Teams or Individual Customers?
   - Look for: organization, workspace, tenant, team, account models → Companies & Teams (use group())
   - No company/org concept, just individual users → Individual Customers (skip group())
4. Where do core user actions happen? (views, routes, API endpoints, services)
5. Is there existing analytics? (search: segment, posthog, mixpanel, amplitude, track)
Match your integration style to the framework's conventions.

Phase 2: Instrument
Add these calls using idiomatic patterns for the framework:
- Initialize client once (Django: apps.py/settings, Flask: app factory, FastAPI: lifespan/startup)
- identify() in auth/login success handler
- group() when user-org association is established (Companies & Teams mode only)
- track() for key user actions (see below)

WHAT TO TRACK:
Active engagement (primary): feature usage, resource creation, collaboration, completing flows
Session signals (secondary): login/session start, dashboard access - distinguishes "low usage" from "churned"
Do NOT track: every request, health checks, middleware passthrough, background tasks

Phase 3: Verify
Confirm: client initialized, shutdown handled, identify/group/track calls added

Phase 4: Summarize
Report what you added:
- Files modified and what was added to each
- Events being tracked (list event names and what triggers them)
- How user_id is obtained (and group_id if Companies & Teams mode)
- Any assumptions made or questions

API Reference

Constructor

KlimeClient(
    write_key: str,                    # Required: Your Klime write key
    endpoint: Optional[str] = None,    # Optional: API endpoint (default: https://i.klime.com)
    flush_interval: Optional[int] = None,      # Optional: Milliseconds between flushes (default: 2000)
    max_batch_size: Optional[int] = None,     # Optional: Max events per batch (default: 20, max: 100)
    max_queue_size: Optional[int] = None,      # Optional: Max queued events (default: 1000)
    retry_max_attempts: Optional[int] = None, # Optional: Max retry attempts (default: 5)
    retry_initial_delay: Optional[int] = None, # Optional: Initial retry delay in ms (default: 1000)
    flush_on_shutdown: Optional[bool] = None,  # Optional: Auto-flush on exit (default: True)
    logger: Optional[logging.Logger] = None,   # Optional: Custom logger (default: logging.getLogger("klime"))
    on_error: Optional[Callable] = None,       # Optional: Callback for batch failures
    on_success: Optional[Callable] = None      # Optional: Callback for successful sends
)

Methods

track(event: str, properties: Optional[Dict] = None, user_id: Optional[str] = None, group_id: Optional[str] = None) -> None

Track an event. Events can be attributed in two ways:

  • User events: Provide user_id to track user activity (most common)
  • Group events: Provide group_id without user_id for organization-level events
# User event (most common)
client.track('Button Clicked', {
    'button_name': 'Sign up',
    'plan': 'pro'
}, user_id='user_123')

# Group event (for webhooks, cron jobs, system events)
client.track('Events Received', {
    'count': 100,
    'source': 'webhook'
}, group_id='org_456')

Note: The group_id parameter can also be combined with user_id for multi-tenant scenarios where you need to specify which organization context a user event occurred in.

identify(user_id: str, traits: Optional[Dict] = None) -> None

Identify a user with traits.

client.identify('user_123', {
    'email': 'user@example.com',
    'name': 'Stefan'
})

group(group_id: str, traits: Optional[Dict] = None, user_id: Optional[str] = None) -> None

Associate a user with a group and/or set group traits.

# Associate user with a group and set group traits (most common)
client.group('org_456', {
    'name': 'Acme Inc',
    'plan': 'enterprise'
}, user_id='user_123')

# Just link a user to a group (traits already set or not needed)
client.group('org_456', user_id='user_123')

# Just update group traits (e.g., from a webhook or background job)
client.group('org_456', {
    'plan': 'enterprise',
    'employee_count': 50
})

flush() -> None

Manually flush queued events immediately.

client.flush()

shutdown() -> None

Gracefully shutdown the client, flushing remaining events.

client.shutdown()

queue_size() -> int

Return the number of events currently in the queue.

pending = client.queue_size()
print(f"{pending} events waiting to be sent")

Synchronous Methods

For cases where you need confirmation that events were sent (e.g., before process exit, in tests), use the synchronous variants:

track_sync(event, properties, user_id, group_id) -> BatchResponse

Track an event synchronously. Raises SendError on failure.

from klime import SendError

try:
    response = client.track_sync('Critical Action', {'key': 'value'}, user_id='user_123')
    print(f"Sent! Accepted: {response.accepted}")
except SendError as e:
    print(f"Failed to send: {e.message}")

identify_sync(user_id, traits) -> BatchResponse

Identify a user synchronously. Raises SendError on failure.

group_sync(group_id, traits, user_id) -> BatchResponse

Associate a user with a group synchronously. Raises SendError on failure.

Features

  • Automatic Batching: Events are automatically batched and sent every 2 seconds or when the batch size reaches 20 events
  • Automatic Retries: Failed requests are automatically retried with exponential backoff
  • Thread-Safe: Safe to use from multiple threads
  • Process Exit Handling: Automatically flushes events on process exit (via atexit)
  • Zero Dependencies: Uses only Python standard library

Performance

When you call track(), identify(), or group(), the SDK:

  1. Adds the event to a thread-safe queue (microseconds)
  2. Returns immediately without waiting for network I/O

Events are sent to Klime's servers in background threads. This means:

  • No network blocking: HTTP requests happen asynchronously in background threads
  • No latency impact: Tracking calls add < 1ms to your request handling time
  • Automatic batching: Events are queued and sent in batches (default: every 2 seconds or 20 events)
# This returns immediately - no HTTP request is made here
client.track('Button Clicked', {'button': 'signup'}, user_id='user_123')

# Your code continues without waiting
return {'success': True}

The only blocking operation is flush(), which waits for all queued events to be sent. This is typically only called during graceful shutdown.

Configuration

Default Values

  • flush_interval: 2000ms
  • max_batch_size: 20 events
  • max_queue_size: 1000 events
  • retry_max_attempts: 5 attempts
  • retry_initial_delay: 1000ms
  • flush_on_shutdown: True

Logging

The SDK uses Python's built-in logging module. By default, it logs to logging.getLogger("klime").

import logging

# Enable debug logging
logging.getLogger("klime").setLevel(logging.DEBUG)

# Or provide a custom logger
client = KlimeClient(
    write_key='your-write-key',
    logger=logging.getLogger("myapp.klime")
)

For Django/Flask, the SDK automatically integrates with your app's logging configuration.

Callbacks

def handle_error(error, events):
    # Report to your error tracking service
    sentry_sdk.capture_exception(error)
    print(f"Failed to send {len(events)} events: {error}")

def handle_success(response):
    print(f"Sent {response.accepted} events")

client = KlimeClient(
    write_key='your-write-key',
    on_error=handle_error,
    on_success=handle_success
)

Error Handling

The SDK automatically handles:

  • Transient errors (429, 503, network failures): Retries with exponential backoff
  • Permanent errors (400, 401): Logs error and drops event
  • Rate limiting: Respects Retry-After header

For synchronous operations, use *_sync() methods which raise SendError on failure:

from klime import KlimeClient, SendError

try:
    response = client.track_sync('Event', user_id='user_123')
except SendError as e:
    print(f"Failed: {e.message}, events: {len(e.events)}")

Size Limits

  • Maximum event size: 200KB
  • Maximum batch size: 10MB
  • Maximum events per batch: 100

Events exceeding these limits are rejected and logged.

Flask Example

from flask import Flask, request
from klime import KlimeClient

app = Flask(__name__)
client = KlimeClient(write_key='your-write-key')

@app.route('/api/button-clicked', methods=['POST'])
def button_clicked():
    client.track('Button Clicked', {
        'button_name': request.json.get('buttonName')
    }, user_id=request.json.get('userId'))

    return {'success': True}

# Graceful shutdown
import atexit
atexit.register(client.shutdown)

Django Example

from django.http import JsonResponse
from django.views import View
from klime import KlimeClient

client = KlimeClient(write_key='your-write-key')

class ButtonClickView(View):
    def post(self, request):
        client.track('Button Clicked', {
            'button_name': request.POST.get('buttonName')
        }, user_id=str(request.user.id))

        return JsonResponse({'success': True})

Requirements

  • Python 3.9 or higher
  • No external dependencies (uses only standard library)

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

klime-1.1.0.tar.gz (18.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

klime-1.1.0-py3-none-any.whl (11.4 kB view details)

Uploaded Python 3

File details

Details for the file klime-1.1.0.tar.gz.

File metadata

  • Download URL: klime-1.1.0.tar.gz
  • Upload date:
  • Size: 18.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.2

File hashes

Hashes for klime-1.1.0.tar.gz
Algorithm Hash digest
SHA256 48dcd48d8c2a351b3f447810b121ee59d55faaa0356d7e498ac56ea8c507e591
MD5 9b8fd8e6b09fde774fbdd71a6513102e
BLAKE2b-256 c66da4031b839a2720f56c921b32bf46ebf0ac7b590de5bf4ec93eb0f13fb6c5

See more details on using hashes here.

File details

Details for the file klime-1.1.0-py3-none-any.whl.

File metadata

  • Download URL: klime-1.1.0-py3-none-any.whl
  • Upload date:
  • Size: 11.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.2

File hashes

Hashes for klime-1.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 7f27e8f4b08407a037fb99f286bfa57fdb2e9e5c666e2cbdfbb7f48514f188d9
MD5 3ec45a2957d1f17d11de814b4b739f12
BLAKE2b-256 01fdf3552804405f13b26c272a930e3e58b0bc3c15430fca842cbba5363ba387

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page