A micro library for building Camunda connectors in Python using decorators
Project description
Camunda Connector Python Library
A micro library for building Camunda connectors in Python using decorators. This library implements the Camunda Connector Server Protocol and allows you to create connectors with minimal boilerplate code.
Features
- Simple decorator-based API - Define connectors using
@connectordecorators - Type-safe - Uses Pydantic models for input/output validation
- Auto-generated endpoints - Automatically creates HTTP endpoints following the CSP protocol
- Async support - Supports both synchronous and asynchronous connector functions
- Error handling - Built-in error handling with proper JSON-RPC error responses
- Callback support - Handles asynchronous processing with callback URLs
Installation
pip install camunda-connector-python
Quick Start
Here's a simple example of creating a connector:
from pydantic import BaseModel
from camunda_connector import connector, ConnectorServer, ConnectorInput, ConnectorOutput
# Define input and output models
class CalculatorInput(ConnectorInput):
action: str # "add", "subtract", "multiply", "divide"
a: float
b: float
class CalculatorOutput(ConnectorOutput):
result: float
action: str
# Define the connector
@connector(name="io.example:calculator:1")
def calculate(input_data: CalculatorInput) -> CalculatorOutput:
actions = {
"add": lambda a, b: a + b,
"subtract": lambda a, b: a - b,
"multiply": lambda a, b: a * b,
"divide": lambda a, b: a / b if b != 0 else None
}
if input_data.action not in actions:
raise ValueError(f"Unsupported action: {input_data.action}")
result = actions[input_data.action](input_data.a, input_data.b)
return CalculatorOutput(
result=result,
action=input_data.action
)
# Create and run the server
if __name__ == "__main__":
server = ConnectorServer(title="Calculator Connector", version="1.0.0")
server.run(host="0.0.0.0", port=8080)
Usage
1. Define Input/Output Models
Create Pydantic models that inherit from ConnectorInput and ConnectorOutput:
from camunda_connector import ConnectorInput, ConnectorOutput
class MyConnectorInput(ConnectorInput):
message: str
count: int = 1
class MyConnectorOutput(ConnectorOutput):
result: str
processed_count: int
2. Create Connector Functions
Use the @connector decorator to register your connector functions:
@connector(name="io.example:my-connector:1")
def my_connector(input_data: MyConnectorInput) -> MyConnectorOutput:
# Your connector logic here
result = f"Processed: {input_data.message}"
return MyConnectorOutput(
result=result,
processed_count=input_data.count
)
3. Async Connectors
You can also create asynchronous connectors:
@connector(name="io.example:async-connector:1")
async def async_connector(input_data: MyConnectorInput) -> MyConnectorOutput:
import asyncio
await asyncio.sleep(1) # Simulate async work
return MyConnectorOutput(
result="Async result",
processed_count=input_data.count
)
4. Run the Server
Create a ConnectorServer instance and run it:
server = ConnectorServer(title="My Connectors", version="1.0.0")
server.run(host="0.0.0.0", port=8080)
API Endpoints
The server automatically creates the following endpoints:
POST /csp/connector- Invoke a connector (connector type specified in method parameter)GET /csp/connectors- List all registered connectorsGET /health- Health check endpoint
Making Requests
To invoke a connector, send a JSON-RPC request to the connector endpoint:
curl -X POST http://localhost:8080/csp/connector \
-H "Content-Type: application/json" \
-d '{
"jsonrpc": "2.0",
"id": 12345,
"method": "io.example:calculator:1",
"params": {
"input": {
"action": "add",
"a": 5,
"b": 3
}
}
}'
Response:
{
"jsonrpc": "2.0",
"id": 12345,
"result": {
"output": {
"result": 8.0,
"action": "add"
},
"statusCode": 200,
"error": null
}
}
Error Handling
The library provides built-in error handling. You can raise ConnectorError for custom errors:
from camunda_connector import ConnectorError
@connector(name="io.example:validator:1")
def validate_data(input_data: MyInput) -> MyOutput:
if not input_data.message:
raise ConnectorError(
message="Message cannot be empty",
code=-32602,
data={"field": "message"}
)
# ... rest of the logic
Asynchronous Processing
For long-running operations, you can use asynchronous processing with callbacks:
@connector(name="io.example:long-task:1")
def long_running_task(input_data: MyInput) -> MyOutput:
# The framework will automatically handle the callback
# if a callbackUrl is provided in the request
import time
time.sleep(10) # Simulate long work
return MyOutput(result="Task completed")
When a callbackUrl is provided in the request, the connector will:
- Return a 202 Accepted response immediately
- Process the connector function
- Send the result to the callback URL when complete
Connector Server Protocol
Connector Server Protocol is a JSON RPC protocol that allows the Camunda Connector runtime to invoke external connectors. It is designed to be used with the Camunda Connector runtime, and offers the benefit of abstracting the underlying Zeebe API and job worker context, while still allowing to use the classic variable binding and secret resolution features of the Camunda Connector runtime.
An external connector is an HTTP server that implements the Connector Server Protocol.
Connector endpoint URL
The URL of the connector endpoint should follow the standardized format:
https://<host>:<port>/csp/connector
Where <host> is the hostname or IP address of the connector server, and <port> is the port on which the connector server is listening.
Request object
{
"jsonrpc": "2.0",
"id": 99999999,
"method": "io.camunda:connector-type:1",
"params": {
"input": {
"key1": "value1",
"key2": "value2"
},
"callbackUrl": "https://bru-2.connectors.camunda.io/csp/callback"
}
}
Response object
{
"jsonrpc": "2.0",
"id": 99999999,
"result": {
"output": {
"key1": "value1",
"key2": "value2"
},
"statusCode": 200,
"error": null
}
}
Synchronous response
The connector can respond synchronously by returning a response object with the result field and
the statusCode field set to 200 OK.
Asynchronous response
The connector can respond asynchronously by returning a 202 Accepted response. In this case, the
connector must issue a callback to the callbackUrl provided in the request with the result
object.
Error response object
{
"jsonrpc": "2.0",
"id": 99999999,
"error": {
"code": -32603,
"message": "Internal error",
"data": {
"errorMessage": "Invalid input for connector type io.camunda:connector-type:1"
}
}
}
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file camunda_connector_python-0.2.0.tar.gz.
File metadata
- Download URL: camunda_connector_python-0.2.0.tar.gz
- Upload date:
- Size: 15.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
096ec588ff59c7fafff95b60ef8dafaabbaa05c672e1f8368c475fb8f8869bab
|
|
| MD5 |
f66756f70a3ae9de338041cb19cb9e57
|
|
| BLAKE2b-256 |
8b180b84c8fe74b7f4c94f2bc64aded36f7e275c134f94ab0b3115b7a9e634e2
|
File details
Details for the file camunda_connector_python-0.2.0-py3-none-any.whl.
File metadata
- Download URL: camunda_connector_python-0.2.0-py3-none-any.whl
- Upload date:
- Size: 9.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
bd436710488629d0a7bd33c113ce1d77322094762e90e0eaeefbd1cb9979ee5b
|
|
| MD5 |
076501e426fe23fb22c3ce481e8b178e
|
|
| BLAKE2b-256 |
6d8564c0aad465c45b7809f169ab60961bbbe2b50fde714b05044d860336567b
|