Kafka-based distributed task processing SDK
Project description
Ninja Kafka SDK
Environment-agnostic SDK for distributed task processing with Kafka messaging.
Send tasks to Ninja services and get results back with explicit configuration - no more localhost fallbacks or production surprises!
🚀 Quick Start - Send Your First Task
import asyncio
from ninja_kafka_sdk import NinjaClient
async def send_task_example():
# Explicit configuration (REQUIRED)
client = NinjaClient(
kafka_servers="localhost:9092", # REQUIRED: Kafka broker addresses
consumer_group="my-service" # REQUIRED: Your service's consumer group
)
# Send task with parameters (v0.3.23+ properly handles parameters field)
correlation_id = await client.send_task(
task="linkedin_like",
account_id=123,
email="user@example.com",
# Task-specific parameters go in parameters field (NOT in metadata)
parameters={
'post_url': 'https://www.linkedin.com/posts/example',
'author_first_name': 'John',
'author_last_name': 'Doe'
},
# API endpoints for callbacks
api_endpoints={
'callback_url': 'https://api.example.com/callback'
}
)
print(f"✅ Task sent with correlation ID: {correlation_id}")
return correlation_id
# Run the async function
asyncio.run(send_task_example())
📦 Installation
# Install from PyPI
pip install ninja-kafka-sdk
# Or install specific version
pip install ninja-kafka-sdk==0.3.24
📨 Listening for Task Results
import asyncio
from ninja_kafka_sdk import NinjaClient
async def listen_for_results():
client = NinjaClient(
kafka_servers="localhost:9092",
consumer_group="result-listener"
)
# Listen for all results
async for result in client.listen_results():
print(f"📥 Received result:")
print(f" Correlation ID: {result.get('correlation_id')}")
print(f" Status: {result.get('status')}")
if result.get('status') == 'success':
data = result.get('data', {})
print(f" ✅ Success! Data: {data}")
else:
error = result.get('error', {})
print(f" ❌ Failed! Error: {error}")
# Run the listener
asyncio.run(listen_for_results())
Listen for Specific Task Results
async def wait_for_specific_task():
client = NinjaClient(
kafka_servers="localhost:9092",
consumer_group="my-service"
)
# Send a task
correlation_id = await client.send_task(
task="linkedin_post",
account_id=456,
parameters={
'post_content': "Hello LinkedIn!",
'wait_before_post': (2, 5)
}
)
# Listen only for this specific result
async for result in client.listen_results(correlation_ids=[correlation_id]):
if result.get('correlation_id') == correlation_id:
print(f"Got our result! Status: {result.get('status')}")
break # Stop listening after receiving our result
asyncio.run(wait_for_specific_task())
⚙️ Configuration
Explicit Configuration (Recommended)
from ninja_kafka_sdk import NinjaClient
client = NinjaClient(
kafka_servers="your-kafka-servers:9092", # Change for your environment
consumer_group="your-service-name" # Unique name for your service
)
Configuration from Config Object
from ninja_kafka_sdk import NinjaClient
from your_app.config import config # Your application's config
client = NinjaClient(
kafka_servers=config.KAFKA_SERVERS,
consumer_group=config.KAFKA_CONSUMER_GROUP
)
Configuration with Config Object
from ninja_kafka_sdk import NinjaClient, NinjaKafkaConfig
# Create configuration object
config = NinjaKafkaConfig(
kafka_servers="b-1.msk-cluster.amazonaws.com:9092,b-2.msk-cluster.amazonaws.com:9092",
consumer_group="my-service",
environment="stage",
tasks_topic="ninja-tasks",
results_topic="ninja-results"
)
# Use with client
client = NinjaClient(config=config)
🆕 Version 0.3.23+ Parameters Field Update
✅ IMPROVEMENT: The parameters field is now properly handled in send_task():
# After v0.3.23+, parameters go to the correct field
await client.send_task(
task="linkedin_like",
account_id=123,
# These go to their proper fields (not nested in metadata!)
parameters={'post_url': 'https://...'}, # → request.parameters
api_endpoints={'callback': 'https://...'} # → request.api_endpoints
)
🆕 Version 0.2.0 Breaking Changes
⚠️ BREAKING CHANGE: kafka_servers and consumer_group are now REQUIRED parameters.
Migration from v0.1.x
# OLD (v0.1.x) - Had auto-detection and localhost fallbacks
client = NinjaClient() # ❌ This no longer works
# NEW (v0.2.0+) - Explicit configuration required
client = NinjaClient(
kafka_servers="your-kafka-servers:9092", # ✅ Required
consumer_group="your-service-name" # ✅ Required
)
Why This Change?
- Production Safety: Prevents localhost fallbacks in production environments
- Explicit Configuration: No more guessing what environment you're connecting to
- Debugging: Clear errors when configuration is missing
- Environment Agnostic: Same code works everywhere with different config
💡 How to Send Tasks
Basic Task Execution
from ninja_kafka_sdk import NinjaClient
async def verify_linkedin_account():
# Explicit configuration for production
client = NinjaClient(
kafka_servers="b-1.msk-cluster.amazonaws.com:9092,b-2.msk-cluster.amazonaws.com:9092",
consumer_group="auto-login-service",
environment="prod"
)
try:
# Send task and wait for result (one method call)
result = await client.execute_task(
task="linkedin_verification",
account_id=12345,
email="user@example.com",
timeout=300 # 5 minutes
)
if result.is_success:
print("✅ Verification successful!")
return result.cookies
else:
print(f"❌ Failed: {result.error_message}")
return None
finally:
client.stop()
Advanced Usage Patterns
Fire and Forget
async def send_multiple_tasks():
# Must provide explicit configuration
client = NinjaClient(
kafka_servers="localhost:9092",
consumer_group="task-sender"
)
# Send task without waiting for result
correlation_id = await client.send_task(
task="linkedin_verification",
account_id=123
)
print(f"Task sent: {correlation_id}")
client.stop()
Batch Processing
async def process_multiple_accounts():
client = NinjaClient(
kafka_servers="your-kafka-servers:9092",
consumer_group="batch-processor"
)
accounts = [123, 456, 789]
try:
# Send all tasks
task_ids = []
for account_id in accounts:
task_id = await client.send_task("linkedin_verification", account_id=account_id)
task_ids.append(task_id)
# Listen for all results
completed = 0
async for result in client.listen_results(correlation_ids=task_ids):
completed += 1
print(f"Account {result.account_id}: {result.status}")
if completed >= len(accounts):
break
finally:
client.stop()
Different Environment Examples
# Local development
async def local_verification():
client = NinjaClient(
kafka_servers="localhost:9092",
consumer_group="local-test",
environment="local" # Optional: for logging only
)
result = await client.execute_task("linkedin_verification", account_id=123)
client.stop()
return result
# Production environment
async def production_verification():
client = NinjaClient(
kafka_servers="b-1.msk-cluster.amazonaws.com:9092,b-2.msk-cluster.amazonaws.com:9092",
consumer_group="auto-login-prod",
environment="production"
)
result = await client.execute_task("linkedin_verification", account_id=123)
client.stop()
return result
# Using config object
async def config_based_verification():
from your_app.config import config
client = NinjaClient(
kafka_servers=config.KAFKA_SERVERS,
consumer_group=config.KAFKA_CONSUMER_GROUP
)
result = await client.execute_task("linkedin_verification", account_id=123)
client.stop()
return result
🏗️ Available Tasks
LinkedIn Verification
result = await client.execute_task(
task="linkedin_verification",
account_id=123,
email="user@example.com", # Optional but highly recommended
timeout=300 # 5 minutes
)
Future Tasks
More task types will be added for different platforms:
twitter_verificationinstagram_verificationfacebook_verification
📝 Message Models
Task Request
@dataclass
class NinjaTaskRequest:
task: str # "linkedin_verification"
account_id: int # Account ID
correlation_id: str # Auto-generated UUID
email: Optional[str] # Account email
user_id: Optional[int] # User ID
metadata: Dict[str, Any] # Additional parameters
Task Result
@dataclass
class NinjaTaskResult:
correlation_id: str # Matches request
task: str # Task type
status: str # "VERIFIED", "FAILED", etc.
success: bool # True if successful
account_id: int # Account ID
cookies: Optional[str] # Extracted cookies
data: Optional[Dict] # Additional result data
error: Optional[Dict] # Error details if failed
@property
def is_success(self) -> bool:
return self.success or self.status == 'VERIFIED'
🚨 Error Handling
from ninja_kafka_sdk import (
NinjaClient, NinjaTaskTimeoutError,
NinjaTaskError, NinjaKafkaConnectionError
)
try:
result = await client.execute_task("linkedin_verification", account_id=123)
except NinjaTaskTimeoutError:
print("Task took too long")
except NinjaTaskError as e:
print(f"Ninja couldn't complete task: {e.details}")
except NinjaKafkaConnectionError:
print("Can't connect to Kafka")
🔌 Extending for New Services
# Add new task types easily
await client.send_task(
task="twitter_scraping",
account_id=123,
parameters={"target_user": "@elonmusk"}
)
# SDK handles routing to appropriate Ninja service
🔧 Troubleshooting
Common Configuration Issues
Issue: "Can't connect to Kafka"
# Check your servers configuration
from ninja_kafka_sdk.config import NinjaKafkaConfig
config = NinjaKafkaConfig()
print(f"Environment: {config.environment}")
print(f"Kafka servers: {config.kafka_servers}")
print(f"Consumer group: {config.consumer_group}")
Solutions:
- Local Development: Ensure Kafka is running on
localhost:9092 - Stage/Prod: Verify
KAFKA_STAGE_SERVERSorKAFKA_PROD_SERVERSare set - Custom Provider: Use
KAFKA_BOOTSTRAP_SERVERSfor explicit override
Issue: "No messages received"
# Check consumer group conflicts
import os
print(f"Consumer group: {os.getenv('KAFKA_CONSUMER_GROUP', 'auto-detected')}")
# Force specific consumer group
os.environ['KAFKA_CONSUMER_GROUP'] = 'my-unique-group'
client = NinjaClient()
Issue: "Task timeout"
# Increase timeout for slow operations
client = NinjaClient(timeout=600) # 10 minutes
result = await client.execute_task("linkedin_verification", account_id=123, timeout=300)
Environment Detection Debug
from ninja_kafka_sdk.config import NinjaKafkaConfig
# Debug environment detection
config = NinjaKafkaConfig()
print(f"Environment: {config.environment}")
print(f"Servers: {config.kafka_servers}")
# Force specific environment
config = NinjaKafkaConfig(environment='stage')
print(f"Forced stage servers: {config.kafka_servers}")
Quick Health Check
from ninja_kafka_sdk import NinjaClient
import asyncio
async def health_check():
client = NinjaClient()
try:
# Test connection by sending a test message
correlation_id = await client.send_task("health_check", account_id=0)
print(f"✅ Connection OK - Test message sent: {correlation_id}")
return True
except Exception as e:
print(f"❌ Connection failed: {e}")
return False
finally:
client.stop()
# Run health check
asyncio.run(health_check())
📚 Appendix: For Service Implementers
This section contains information for developers implementing Ninja services (like browser-ninja) that process tasks and send results back.
Sending Task Results
If you're building a service that processes Ninja tasks, use these methods to send results:
from ninja_kafka_sdk import NinjaClient
async def send_verification_result():
# Configure client for service that processes tasks
client = NinjaClient(
kafka_servers="your-kafka-servers:9092",
consumer_group="browser-ninja", # Service-specific consumer group
environment="prod"
)
try:
# Send success result
await client.send_success_result(
correlation_id="task-123-456",
account_id=12345,
email="user@example.com",
cookies="extracted_cookies_data",
screenshot="base64_screenshot"
)
# Or send error result
await client.send_error_result(
correlation_id="task-123-457",
account_id=12346,
email="user2@example.com",
error_code="LOGIN_FAILED",
error_message="Invalid credentials"
)
finally:
client.stop()
Listening for Tasks (Future Feature)
from ninja_kafka_sdk import NinjaClient
async def process_ninja_tasks():
client = NinjaClient(
kafka_servers="your-kafka-servers:9092",
consumer_group="browser-ninja"
)
try:
# Listen for incoming tasks
async for task in client.listen_tasks():
print(f"📥 Received task: {task.task} for account {task.account_id}")
# Process the task
if task.task == "linkedin_verification":
result = await process_linkedin_verification(task)
# Send result back
if result["success"]:
await client.send_success_result(
correlation_id=task.correlation_id,
account_id=task.account_id,
email=task.email,
cookies=result["cookies"]
)
else:
await client.send_error_result(
correlation_id=task.correlation_id,
account_id=task.account_id,
email=task.email,
error_code=result["error_code"],
error_message=result["error_message"]
)
finally:
client.stop()
The Ninja Kafka SDK simplifies task-based communication while maintaining enterprise-grade reliability.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file ninja_kafka_sdk-0.3.25.tar.gz.
File metadata
- Download URL: ninja_kafka_sdk-0.3.25.tar.gz
- Upload date:
- Size: 41.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c468b4b9385eb0f1d3d7dca2fc46cb593a01bcb0f03d3e8dc42e001aeb0cb881
|
|
| MD5 |
82a2ea6e86c92d008eb93d6f6b63af19
|
|
| BLAKE2b-256 |
36721ffa791196c1149cd9c7be4c1c3a367ea527bad3a1bf9ce1d5ed1a22f9e6
|
File details
Details for the file ninja_kafka_sdk-0.3.25-py3-none-any.whl.
File metadata
- Download URL: ninja_kafka_sdk-0.3.25-py3-none-any.whl
- Upload date:
- Size: 42.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7e6b1b54d61f469295fc506706b6450e73472534bafae73dcef04c72ffd7e7ea
|
|
| MD5 |
18b4ab8f692f5ad8b7195e50baaa084a
|
|
| BLAKE2b-256 |
c7e183c1d58b883ed0d1fb8d0730cd9b6610ec1c1ff5610e57cb4f359cc5130a
|