Skip to main content

Kafka-based distributed task processing SDK

Project description

Ninja Kafka SDK

Environment-agnostic SDK for distributed task processing with Kafka messaging.

Send tasks to Ninja services and get results back with explicit configuration - no more localhost fallbacks or production surprises!

🚀 Quick Start - Send Your First Task

from ninja_kafka_sdk import NinjaClient

# Explicit configuration (REQUIRED in v0.2.0+)
client = NinjaClient(
    kafka_servers="your-kafka-servers:9092",  # REQUIRED: explicit broker addresses
    consumer_group="your-service-name"        # REQUIRED: unique consumer group
)

# Send task and wait for result
result = await client.execute_task(
    task="linkedin_verification", 
    account_id=123,
    email="user@example.com"
)

if result.is_success:
    print("✅ Task completed successfully!")
    print(f"Result data: {result.data}")
else:
    print(f"❌ Failed: {result.error_message}")

📦 Installation

# Install from PyPI
pip install ninja-kafka-sdk

# Or install specific version
pip install ninja-kafka-sdk==0.2.0

⚙️ Configuration

Explicit Configuration (Recommended)

from ninja_kafka_sdk import NinjaClient

client = NinjaClient(
    kafka_servers="your-kafka-servers:9092",  # Change for your environment
    consumer_group="your-service-name"        # Unique name for your service
)

Configuration from Config Object

from ninja_kafka_sdk import NinjaClient
from your_app.config import config  # Your application's config

client = NinjaClient(
    kafka_servers=config.KAFKA_SERVERS,
    consumer_group=config.KAFKA_CONSUMER_GROUP
)

Configuration with Config Object

from ninja_kafka_sdk import NinjaClient, NinjaKafkaConfig

# Create configuration object
config = NinjaKafkaConfig(
    kafka_servers="b-1.msk-cluster.amazonaws.com:9092,b-2.msk-cluster.amazonaws.com:9092",
    consumer_group="my-service",
    environment="stage",
    tasks_topic="ninja-tasks",
    results_topic="ninja-results"
)

# Use with client
client = NinjaClient(config=config)

🆕 Version 0.2.0 Breaking Changes

⚠️ BREAKING CHANGE: kafka_servers and consumer_group are now REQUIRED parameters.

Migration from v0.1.x

# OLD (v0.1.x) - Had auto-detection and localhost fallbacks
client = NinjaClient()  # ❌ This no longer works

# NEW (v0.2.0+) - Explicit configuration required  
client = NinjaClient(
    kafka_servers="your-kafka-servers:9092",  # ✅ Required
    consumer_group="your-service-name"        # ✅ Required
)

Why This Change?

  • Production Safety: Prevents localhost fallbacks in production environments
  • Explicit Configuration: No more guessing what environment you're connecting to
  • Debugging: Clear errors when configuration is missing
  • Environment Agnostic: Same code works everywhere with different config

💡 How to Send Tasks

Basic Task Execution

from ninja_kafka_sdk import NinjaClient

async def verify_linkedin_account():
    # Explicit configuration for production
    client = NinjaClient(
        kafka_servers="b-1.msk-cluster.amazonaws.com:9092,b-2.msk-cluster.amazonaws.com:9092",
        consumer_group="auto-login-service",
        environment="prod"
    )
    
    try:
        # Send task and wait for result (one method call)
        result = await client.execute_task(
            task="linkedin_verification",
            account_id=12345,
            email="user@example.com",
            timeout=300  # 5 minutes
        )
        
        if result.is_success:
            print("✅ Verification successful!")
            return result.cookies
        else:
            print(f"❌ Failed: {result.error_message}")
            return None
            
    finally:
        client.stop()

Advanced Usage Patterns

Fire and Forget

async def send_multiple_tasks():
    # Must provide explicit configuration
    client = NinjaClient(
        kafka_servers="localhost:9092",
        consumer_group="task-sender"
    )
    
    # Send task without waiting for result
    correlation_id = await client.send_task(
        task="linkedin_verification", 
        account_id=123
    )
    print(f"Task sent: {correlation_id}")
    client.stop()

Batch Processing

async def process_multiple_accounts():
    client = NinjaClient(
        kafka_servers="your-kafka-servers:9092",
        consumer_group="batch-processor"
    )
    accounts = [123, 456, 789]

    try:
        # Send all tasks
        task_ids = []
        for account_id in accounts:
            task_id = await client.send_task("linkedin_verification", account_id=account_id)
            task_ids.append(task_id)

        # Listen for all results
        completed = 0
        async for result in client.listen_results(correlation_ids=task_ids):
            completed += 1
            print(f"Account {result.account_id}: {result.status}")
            if completed >= len(accounts):
                break
                
    finally:
        client.stop()

Different Environment Examples

# Local development
async def local_verification():
    client = NinjaClient(
        kafka_servers="localhost:9092",
        consumer_group="local-test",
        environment="local"  # Optional: for logging only
    )
    result = await client.execute_task("linkedin_verification", account_id=123)
    client.stop()
    return result

# Production environment
async def production_verification():
    client = NinjaClient(
        kafka_servers="b-1.msk-cluster.amazonaws.com:9092,b-2.msk-cluster.amazonaws.com:9092",
        consumer_group="auto-login-prod",
        environment="production"
    )
    result = await client.execute_task("linkedin_verification", account_id=123)
    client.stop()
    return result

# Using config object  
async def config_based_verification():
    from your_app.config import config
    
    client = NinjaClient(
        kafka_servers=config.KAFKA_SERVERS,
        consumer_group=config.KAFKA_CONSUMER_GROUP
    )
    result = await client.execute_task("linkedin_verification", account_id=123)
    client.stop()
    return result

🏗️ Available Tasks

LinkedIn Verification

result = await client.execute_task(
    task="linkedin_verification",
    account_id=123,
    email="user@example.com",  # Optional but highly recommended
    timeout=300  # 5 minutes
)

Future Tasks

More task types will be added for different platforms:

  • twitter_verification
  • instagram_verification
  • facebook_verification

📝 Message Models

Task Request

@dataclass
class NinjaTaskRequest:
    task: str              # "linkedin_verification"
    account_id: int        # Account ID
    correlation_id: str    # Auto-generated UUID
    email: Optional[str]   # Account email
    user_id: Optional[int] # User ID
    metadata: Dict[str, Any]  # Additional parameters

Task Result

@dataclass 
class NinjaTaskResult:
    correlation_id: str    # Matches request
    task: str             # Task type
    status: str           # "VERIFIED", "FAILED", etc.
    success: bool         # True if successful
    account_id: int       # Account ID
    cookies: Optional[str] # Extracted cookies
    data: Optional[Dict]   # Additional result data
    error: Optional[Dict]  # Error details if failed
    
    @property
    def is_success(self) -> bool:
        return self.success or self.status == 'VERIFIED'

🚨 Error Handling

from ninja_kafka_sdk import (
    NinjaClient, NinjaTaskTimeoutError, 
    NinjaTaskError, NinjaKafkaConnectionError
)

try:
    result = await client.execute_task("linkedin_verification", account_id=123)
    
except NinjaTaskTimeoutError:
    print("Task took too long")
    
except NinjaTaskError as e:
    print(f"Ninja couldn't complete task: {e.details}")
    
except NinjaKafkaConnectionError:
    print("Can't connect to Kafka")

🔌 Extending for New Services

# Add new task types easily
await client.send_task(
    task="twitter_scraping",
    account_id=123,
    parameters={"target_user": "@elonmusk"}
)

# SDK handles routing to appropriate Ninja service

🔧 Troubleshooting

Common Configuration Issues

Issue: "Can't connect to Kafka"

# Check your servers configuration
from ninja_kafka_sdk.config import NinjaKafkaConfig
config = NinjaKafkaConfig()
print(f"Environment: {config.environment}")
print(f"Kafka servers: {config.kafka_servers}")
print(f"Consumer group: {config.consumer_group}")

Solutions:

  1. Local Development: Ensure Kafka is running on localhost:9092
  2. Stage/Prod: Verify KAFKA_STAGE_SERVERS or KAFKA_PROD_SERVERS are set
  3. Custom Provider: Use KAFKA_BOOTSTRAP_SERVERS for explicit override

Issue: "No messages received"

# Check consumer group conflicts
import os
print(f"Consumer group: {os.getenv('KAFKA_CONSUMER_GROUP', 'auto-detected')}")

# Force specific consumer group
os.environ['KAFKA_CONSUMER_GROUP'] = 'my-unique-group'
client = NinjaClient()

Issue: "Task timeout"

# Increase timeout for slow operations
client = NinjaClient(timeout=600)  # 10 minutes
result = await client.execute_task("linkedin_verification", account_id=123, timeout=300)

Environment Detection Debug

from ninja_kafka_sdk.config import NinjaKafkaConfig

# Debug environment detection
config = NinjaKafkaConfig()
print(f"Environment: {config.environment}")
print(f"Servers: {config.kafka_servers}")

# Force specific environment
config = NinjaKafkaConfig(environment='stage')
print(f"Forced stage servers: {config.kafka_servers}")

Quick Health Check

from ninja_kafka_sdk import NinjaClient
import asyncio

async def health_check():
    client = NinjaClient()
    try:
        # Test connection by sending a test message
        correlation_id = await client.send_task("health_check", account_id=0)
        print(f"✅ Connection OK - Test message sent: {correlation_id}")
        return True
    except Exception as e:
        print(f"❌ Connection failed: {e}")
        return False
    finally:
        client.stop()

# Run health check
asyncio.run(health_check())

📚 Appendix: For Service Implementers

This section contains information for developers implementing Ninja services (like browser-ninja) that process tasks and send results back.

Sending Task Results

If you're building a service that processes Ninja tasks, use these methods to send results:

from ninja_kafka_sdk import NinjaClient

async def send_verification_result():
    # Configure client for service that processes tasks
    client = NinjaClient(
        kafka_servers="your-kafka-servers:9092",
        consumer_group="browser-ninja",  # Service-specific consumer group
        environment="prod"
    )
    
    try:
        # Send success result
        await client.send_success_result(
            correlation_id="task-123-456",
            account_id=12345,
            email="user@example.com",
            cookies="extracted_cookies_data",
            screenshot="base64_screenshot"
        )
        
        # Or send error result
        await client.send_error_result(
            correlation_id="task-123-457",
            account_id=12346,
            email="user2@example.com",
            error_code="LOGIN_FAILED",
            error_message="Invalid credentials"
        )
        
    finally:
        client.stop()

Listening for Tasks (Future Feature)

from ninja_kafka_sdk import NinjaClient

async def process_ninja_tasks():
    client = NinjaClient(
        kafka_servers="your-kafka-servers:9092",
        consumer_group="browser-ninja"
    )
    
    try:
        # Listen for incoming tasks
        async for task in client.listen_tasks():
            print(f"📥 Received task: {task.task} for account {task.account_id}")
            
            # Process the task
            if task.task == "linkedin_verification":
                result = await process_linkedin_verification(task)
                
                # Send result back
                if result["success"]:
                    await client.send_success_result(
                        correlation_id=task.correlation_id,
                        account_id=task.account_id,
                        email=task.email,
                        cookies=result["cookies"]
                    )
                else:
                    await client.send_error_result(
                        correlation_id=task.correlation_id,
                        account_id=task.account_id,
                        email=task.email,
                        error_code=result["error_code"],
                        error_message=result["error_message"]
                    )
                    
    finally:
        client.stop()

The Ninja Kafka SDK simplifies task-based communication while maintaining enterprise-grade reliability.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ninja_kafka_sdk-0.3.5.tar.gz (38.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ninja_kafka_sdk-0.3.5-py3-none-any.whl (39.8 kB view details)

Uploaded Python 3

File details

Details for the file ninja_kafka_sdk-0.3.5.tar.gz.

File metadata

  • Download URL: ninja_kafka_sdk-0.3.5.tar.gz
  • Upload date:
  • Size: 38.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for ninja_kafka_sdk-0.3.5.tar.gz
Algorithm Hash digest
SHA256 fab3976dfbe493ec21b04afe1a64f7f819a70e5d52a1405117f2d87f51d82822
MD5 973ea9fe117a3a1c88779992cbe39ed3
BLAKE2b-256 d13db5b9aeb4f8038eb636980fb573fc0e0304923ecbefa31c656babfaf9f5c6

See more details on using hashes here.

File details

Details for the file ninja_kafka_sdk-0.3.5-py3-none-any.whl.

File metadata

File hashes

Hashes for ninja_kafka_sdk-0.3.5-py3-none-any.whl
Algorithm Hash digest
SHA256 6efa9afdd720db7289e0b6c99975e7f891d7acdf7155038ff9eea0895b418417
MD5 e59ac7d03c1a9bf15b0c4335e441f78f
BLAKE2b-256 1a42f001f830c74076403b243ecd22091a4a7fdd3da35a72ef6777b3cda27067

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page