Skip to main content

Kafka-based distributed task processing SDK

Project description

Ninja Kafka SDK

Simple SDK for distributed task processing with Kafka messaging.

Send tasks to Ninja services and get results back with just 3 lines of code instead of managing complex Kafka setup.

🚀 Quick Start - Send Your First Task

from ninja_kafka_sdk import NinjaClient

# Configure your Kafka connection
client = NinjaClient(
    kafka_servers="your-kafka-servers:9092",
    consumer_group="your-service-name"
)

# Send task and wait for result
result = await client.execute_task(
    task="data_processing", 
    account_id=123,
    email="user@example.com"
)

if result.is_success:
    print("✅ Task completed successfully!")
    print(f"Result data: {result.data}")
else:
    print(f"❌ Failed: {result.error_message}")

📦 Installation

# Copy to your project
cp -r ninja_kafka_sdk/ /your/project/

# Install dependencies (already in requirements.txt)
pip install kafka-python

⚙️ Configuration

Explicit Configuration (Recommended)

from ninja_kafka_sdk import NinjaClient

client = NinjaClient(
    kafka_servers="your-kafka-servers:9092",  # Change for your environment
    consumer_group="your-service-name"        # Unique name for your service
)

Configuration from Variables

import os
from ninja_kafka_sdk import NinjaClient

# Read from your application configuration
kafka_servers = os.getenv('MY_KAFKA_SERVERS', 'localhost:9092')
consumer_group = os.getenv('MY_CONSUMER_GROUP', 'my-service')

client = NinjaClient(
    kafka_servers=kafka_servers,
    consumer_group=consumer_group
)

Configuration with Config Object

from ninja_kafka_sdk import NinjaClient, NinjaKafkaConfig

# Create configuration object
config = NinjaKafkaConfig(
    kafka_servers="b-1.msk-cluster.amazonaws.com:9092,b-2.msk-cluster.amazonaws.com:9092",
    consumer_group="my-service",
    environment="stage",
    tasks_topic="ninja-tasks",
    results_topic="ninja-results"
)

# Use with client
client = NinjaClient(config=config)

Auto-Detection Fallback

If no explicit configuration is provided, the SDK will attempt to auto-detect from:

  1. Environment variables (KAFKA_CONNECTION)
  2. Local configuration files (app/local.py)
  3. AWS metadata (if running on EC2)
  4. Smart defaults (localhost:9092 for local development)

💡 How to Send Tasks

Basic Task Execution

from ninja_kafka_sdk import NinjaClient

async def verify_linkedin_account():
    # Explicit configuration for production
    client = NinjaClient(
        kafka_servers="b-1.msk-cluster.amazonaws.com:9092,b-2.msk-cluster.amazonaws.com:9092",
        consumer_group="auto-login-service",
        environment="prod"
    )
    
    try:
        # Send task and wait for result (one method call)
        result = await client.execute_task(
            task="linkedin_verification",
            account_id=12345,
            email="user@example.com",
            timeout=300  # 5 minutes
        )
        
        if result.is_success:
            print("✅ Verification successful!")
            return result.cookies
        else:
            print(f"❌ Failed: {result.error_message}")
            return None
            
    finally:
        client.stop()

Advanced Usage Patterns

Fire and Forget

async def send_multiple_tasks():
    client = NinjaClient()
    
    # Send task without waiting for result
    correlation_id = await client.send_task(
        task="linkedin_verification", 
        account_id=123
    )
    print(f"Task sent: {correlation_id}")
    client.stop()

Batch Processing

async def process_multiple_accounts():
    client = NinjaClient()
    accounts = [123, 456, 789]

    try:
        # Send all tasks
        task_ids = []
        for account_id in accounts:
            task_id = await client.send_task("linkedin_verification", account_id=account_id)
            task_ids.append(task_id)

        # Listen for all results
        completed = 0
        async for result in client.listen_results(correlation_ids=task_ids):
            completed += 1
            print(f"Account {result.account_id}: {result.status}")
            if completed >= len(accounts):
                break
                
    finally:
        client.stop()

Synchronous Usage (Non-async Applications)

def sync_verification():
    client = NinjaClient()

    try:
        # Synchronous task execution
        result = client.execute_task_sync(
            task="linkedin_verification",
            account_id=123,
            email="user@example.com",
            timeout=60
        )
        
        print(f"Result: {result.status}")
        return result.is_success
        
    finally:
        client.stop()

Environment-Specific Usage

# Force specific environment
async def production_verification():
    # Explicitly use production configuration
    client = NinjaClient(environment="prod")
    
    # Will use KAFKA_PROD_SERVERS if set, otherwise shows warning
    result = await client.execute_task("linkedin_verification", account_id=123)
    client.stop()
    return result

# Auto-detect environment  
async def auto_verification():
    # Uses environment detection (local/dev/stage/prod)
    client = NinjaClient()
    result = await client.execute_task("linkedin_verification", account_id=123)
    client.stop()
    return result

🏗️ Available Tasks

LinkedIn Verification

result = await client.execute_task(
    task="linkedin_verification",
    account_id=123,
    email="user@example.com",  # Optional but highly recommended
    timeout=300  # 5 minutes
)

Future Tasks

More task types will be added for different platforms:

  • twitter_verification
  • instagram_verification
  • facebook_verification

📝 Message Models

Task Request

@dataclass
class NinjaTaskRequest:
    task: str              # "linkedin_verification"
    account_id: int        # Account ID
    correlation_id: str    # Auto-generated UUID
    email: Optional[str]   # Account email
    user_id: Optional[int] # User ID
    metadata: Dict[str, Any]  # Additional parameters

Task Result

@dataclass 
class NinjaTaskResult:
    correlation_id: str    # Matches request
    task: str             # Task type
    status: str           # "VERIFIED", "FAILED", etc.
    success: bool         # True if successful
    account_id: int       # Account ID
    cookies: Optional[str] # Extracted cookies
    data: Optional[Dict]   # Additional result data
    error: Optional[Dict]  # Error details if failed
    
    @property
    def is_success(self) -> bool:
        return self.success or self.status == 'VERIFIED'

🚨 Error Handling

from ninja_kafka_sdk import (
    NinjaClient, NinjaTaskTimeoutError, 
    NinjaTaskError, NinjaKafkaConnectionError
)

try:
    result = await client.execute_task("linkedin_verification", account_id=123)
    
except NinjaTaskTimeoutError:
    print("Task took too long")
    
except NinjaTaskError as e:
    print(f"Ninja couldn't complete task: {e.details}")
    
except NinjaKafkaConnectionError:
    print("Can't connect to Kafka")

🔌 Extending for New Services

# Add new task types easily
await client.send_task(
    task="twitter_scraping",
    account_id=123,
    parameters={"target_user": "@elonmusk"}
)

# SDK handles routing to appropriate Ninja service

🔧 Troubleshooting

Common Configuration Issues

Issue: "Can't connect to Kafka"

# Check your servers configuration
from ninja_kafka_sdk.config import NinjaKafkaConfig
config = NinjaKafkaConfig()
print(f"Environment: {config.environment}")
print(f"Kafka servers: {config.kafka_servers}")
print(f"Consumer group: {config.consumer_group}")

Solutions:

  1. Local Development: Ensure Kafka is running on localhost:9092
  2. Stage/Prod: Verify KAFKA_STAGE_SERVERS or KAFKA_PROD_SERVERS are set
  3. Custom Provider: Use KAFKA_BOOTSTRAP_SERVERS for explicit override

Issue: "No messages received"

# Check consumer group conflicts
import os
print(f"Consumer group: {os.getenv('KAFKA_CONSUMER_GROUP', 'auto-detected')}")

# Force specific consumer group
os.environ['KAFKA_CONSUMER_GROUP'] = 'my-unique-group'
client = NinjaClient()

Issue: "Task timeout"

# Increase timeout for slow operations
client = NinjaClient(timeout=600)  # 10 minutes
result = await client.execute_task("linkedin_verification", account_id=123, timeout=300)

Environment Detection Debug

from ninja_kafka_sdk.config import NinjaKafkaConfig

# Debug environment detection
config = NinjaKafkaConfig()
print(f"Environment: {config.environment}")
print(f"Servers: {config.kafka_servers}")

# Force specific environment
config = NinjaKafkaConfig(environment='stage')
print(f"Forced stage servers: {config.kafka_servers}")

Quick Health Check

from ninja_kafka_sdk import NinjaClient
import asyncio

async def health_check():
    client = NinjaClient()
    try:
        # Test connection by sending a test message
        correlation_id = await client.send_task("health_check", account_id=0)
        print(f"✅ Connection OK - Test message sent: {correlation_id}")
        return True
    except Exception as e:
        print(f"❌ Connection failed: {e}")
        return False
    finally:
        client.stop()

# Run health check
asyncio.run(health_check())

📚 Appendix: For Service Implementers

This section contains information for developers implementing Ninja services (like browser-ninja) that process tasks and send results back.

Sending Task Results

If you're building a service that processes Ninja tasks, use these methods to send results:

from ninja_kafka_sdk import NinjaClient

async def send_verification_result():
    # Configure client for service that processes tasks
    client = NinjaClient(
        kafka_servers="your-kafka-servers:9092",
        consumer_group="browser-ninja",  # Service-specific consumer group
        environment="prod"
    )
    
    try:
        # Send success result
        await client.send_success_result(
            correlation_id="task-123-456",
            account_id=12345,
            email="user@example.com",
            cookies="extracted_cookies_data",
            screenshot="base64_screenshot"
        )
        
        # Or send error result
        await client.send_error_result(
            correlation_id="task-123-457",
            account_id=12346,
            email="user2@example.com",
            error_code="LOGIN_FAILED",
            error_message="Invalid credentials"
        )
        
    finally:
        client.stop()

Listening for Tasks (Future Feature)

from ninja_kafka_sdk import NinjaClient

async def process_ninja_tasks():
    client = NinjaClient(
        kafka_servers="your-kafka-servers:9092",
        consumer_group="browser-ninja"
    )
    
    try:
        # Listen for incoming tasks
        async for task in client.listen_tasks():
            print(f"📥 Received task: {task.task} for account {task.account_id}")
            
            # Process the task
            if task.task == "linkedin_verification":
                result = await process_linkedin_verification(task)
                
                # Send result back
                if result["success"]:
                    await client.send_success_result(
                        correlation_id=task.correlation_id,
                        account_id=task.account_id,
                        email=task.email,
                        cookies=result["cookies"]
                    )
                else:
                    await client.send_error_result(
                        correlation_id=task.correlation_id,
                        account_id=task.account_id,
                        email=task.email,
                        error_code=result["error_code"],
                        error_message=result["error_message"]
                    )
                    
    finally:
        client.stop()

The Ninja Kafka SDK simplifies task-based communication while maintaining enterprise-grade reliability.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ninja_kafka_sdk-0.1.3.tar.gz (37.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ninja_kafka_sdk-0.1.3-py3-none-any.whl (38.8 kB view details)

Uploaded Python 3

File details

Details for the file ninja_kafka_sdk-0.1.3.tar.gz.

File metadata

  • Download URL: ninja_kafka_sdk-0.1.3.tar.gz
  • Upload date:
  • Size: 37.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for ninja_kafka_sdk-0.1.3.tar.gz
Algorithm Hash digest
SHA256 487b41856a117c60a2308fe46ddf493886a3b74ce51d7028541c4000f6d25326
MD5 2b8801176b8128cfd19d3bcc4a41789f
BLAKE2b-256 cf8abcf5cff35fd19a321e284edfac4b5d9b9749aab303824058e18d6802c5a5

See more details on using hashes here.

File details

Details for the file ninja_kafka_sdk-0.1.3-py3-none-any.whl.

File metadata

File hashes

Hashes for ninja_kafka_sdk-0.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 760aeea2ca69b51307b29d011d53304d1fed46708faec96a75acfa0fd9596481
MD5 ece502e145726507cd156869b2615607
BLAKE2b-256 5072ae286250c0cfd6e63d5f69e0b5886588a4862d108da2e3c23e232e2ad0da

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page