Skip to main content

Official Python SDK for the Nahook webhook platform

Project description

nahook

Official Python SDK for the Nahook webhook platform.

Two classes, one package:

Class Purpose Auth
NahookClient Send and trigger webhook events API key (nhk_us_...)
NahookManagement Manage endpoints, event types, apps Management token (nhm_...)

Requirements

  • Python 3.9+
  • httpx (installed automatically)

Installation

pip install nahook

NahookClient

Send webhooks to specific endpoints or fan-out by event type.

Setup

from nahook import NahookClient

# Simple
client = NahookClient("nhk_us_...")

# With options
client = NahookClient("nhk_us_...", retries=3, timeout=5_000)
# retries: default 0 (no retries)
# timeout: default 30_000ms

Configuration

The SDK automatically routes requests to the correct regional API based on your API key prefix (nhk_us_... -> US, nhk_eu_... -> EU, nhk_ap_... -> Asia Pacific). No configuration needed.

To override the base URL (for testing or local development):

client = NahookClient("nhk_us_...", base_url="http://localhost:3001")

For unit tests, mock the SDK client at the dependency injection boundary. For integration tests, override the base URL to point at a local server.

Graceful shutdown — with statement and close()

NahookClient and NahookManagement are context managers — the idiomatic Python pattern is with, which closes the underlying httpx.Client's connection pool on exit:

with NahookClient("nhk_us_...") as client:
    client.send("ep_abc123", payload={"orderId": "123", "status": "paid"})
# httpx.Client.close() automatically called here

If with doesn't fit your lifecycle (e.g., a long-lived module-scope client), call close() explicitly during graceful shutdown:

import atexit

client = NahookClient("nhk_us_...")
atexit.register(client.close)

# ... use client across the process lifetime ...

close() drains pooled connections and is safe to call multiple times. The same pattern works on NahookManagement. Skipping close() is fine in short-lived scripts — the OS reaps sockets on process exit — but matters for test harnesses, graceful shutdown handlers, or any process that recycles clients during its lifetime.

Send to a specific endpoint

result = client.send("ep_abc123", {
    "orderId": "123",
    "status": "paid",
}, idempotency_key="order-123-paid")  # optional, auto-generated UUID if omitted

# {"deliveryId": "del_...", "idempotencyKey": "order-123-paid", "status": "accepted"}

Fan-out by event type

result = client.trigger("order.paid", {
    "orderId": "123",
    "status": "paid",
}, metadata={"region": "us-east-1"})  # optional

# {"eventTypeId": "evt_...", "deliveryIds": ["del_..."], "status": "accepted"}

Batch operations

# Send to multiple endpoints (max 20 items)
batch = client.send_batch([
    {"endpointId": "ep_abc", "payload": {"orderId": "123"}},
    {"endpointId": "ep_def", "payload": {"orderId": "456"}},
])

# Fan-out multiple event types (max 20 items)
fan_out = client.trigger_batch([
    {"eventType": "order.paid", "payload": {"orderId": "123"}},
    {"eventType": "order.shipped", "payload": {"orderId": "456"}},
])

# Results: 202 (all succeed) or 207 (mixed)
for item in batch["items"]:
    if "error" in item:
        print(f"Item {item['index']} failed: {item['error']['code']}")

Retry behavior

Retries are opt-in via the retries constructor parameter. When enabled:

  • Strategy: Exponential backoff with full jitter
  • Delays: 500ms base, 10s max
  • Retryable: 5xx, 429 (respects Retry-After), network errors, timeouts
  • Non-retryable: 400, 401, 403, 404, 409, 413
  • Safe by design: Idempotency keys are always sent, making retries safe

NahookManagement

Programmatically manage your Nahook workspace resources.

Setup

from nahook import NahookManagement

# Simple
mgmt = NahookManagement("nhm_...")

# With options
mgmt = NahookManagement("nhm_...", timeout=10_000)
# Note: retries are not supported for management calls

Endpoints

result = mgmt.endpoints.list("ws_abc")
endpoints = result["data"]

endpoint = mgmt.endpoints.create("ws_abc",
    url="https://example.com/webhooks",
    description="Production webhook",
    type_="webhook",  # "webhook" | "slack"
    metadata={"team": "payments"},
)

endpoint = mgmt.endpoints.get("ws_abc", "ep_123")

mgmt.endpoints.update("ws_abc", "ep_123",
    description="Updated",
    is_active=False,
)

mgmt.endpoints.delete("ws_abc", "ep_123")

Event Types

result = mgmt.event_types.list("ws_abc")

event_type = mgmt.event_types.create("ws_abc",
    name="order.paid",
    description="Fired when an order is paid",
)

event_type = mgmt.event_types.get("ws_abc", "evt_123")

mgmt.event_types.update("ws_abc", "evt_123",
    description="Updated description",
)

mgmt.event_types.delete("ws_abc", "evt_123")

Applications

result = mgmt.applications.list("ws_abc", limit=50, offset=0)

app = mgmt.applications.create("ws_abc",
    name="Acme Corp",
    external_id="acme-123",
    metadata={"tier": "pro"},
)

app = mgmt.applications.get("ws_abc", "app_123")

mgmt.applications.update("ws_abc", "app_123", name="Acme Inc")

mgmt.applications.delete("ws_abc", "app_123")

# Endpoints scoped to an application
result = mgmt.applications.list_endpoints("ws_abc", "app_123")
ep = mgmt.applications.create_endpoint("ws_abc", "app_123",
    url="https://acme.com/webhooks",
)

Subscriptions

result = mgmt.subscriptions.list("ws_abc", "ep_123")

mgmt.subscriptions.create("ws_abc", "ep_123", event_type_ids=["evt_456"])

mgmt.subscriptions.delete("ws_abc", "ep_123", "evt_456")

Environments

result = mgmt.environments.list("ws_abc")

env = mgmt.environments.create("ws_abc",
    name="Staging",
    slug="staging",
)

env = mgmt.environments.get("ws_abc", "env_123")

mgmt.environments.update("ws_abc", "env_123", name="Pre-production")

mgmt.environments.delete("ws_abc", "env_123")

Event Type Visibility

Control which event types are visible per environment.

result = mgmt.environments.list_event_type_visibility("ws_abc", "env_123")

vis = mgmt.environments.set_event_type_visibility("ws_abc", "env_123", "evt_456",
    published=True,
)
# {"eventTypeId": "evt_456", "eventTypeName": "order.paid", "published": True}

Deliveries

Read access to a workspace's webhook deliveries — list, fetch, and inspect attempts. There is no create/update/delete on this resource.

# Page through an endpoint's deliveries (newest-first).
result = mgmt.deliveries.list("ws_abc", "ep_123", limit=50)
for delivery in result["data"]:
    print(delivery["id"], delivery["status"])

# next_cursor is opaque — pass it through verbatim to fetch the next page.
# It is None when there are no more pages.
if result["next_cursor"] is not None:
    next_page = mgmt.deliveries.list(
        "ws_abc", "ep_123", limit=50, cursor=result["next_cursor"]
    )

# Filter by status.
failed = mgmt.deliveries.list("ws_abc", "ep_123", status="failed")

# Fetch a single delivery's metadata.
delivery = mgmt.deliveries.get("ws_abc", "del_abc")
print(delivery["status"], delivery["totalAttempts"])

# Fetch with the stored payload envelope. The envelope's ``status`` carries
# the access-level reality — only ``"available"`` payloads include ``data``.
delivery = mgmt.deliveries.get("ws_abc", "del_abc", include_payload=True)
envelope = delivery["payload"]
if envelope["status"] == "available":
    print(envelope["data"], envelope["contentType"])
elif envelope["status"] == "forbidden":
    print("Workspace plan does not include payload storage")
elif envelope["status"] == "processing":
    print("Delivery still in flight — try again shortly")
elif envelope["status"] == "not_found":
    print("No stored payload for this delivery")
elif envelope["status"] == "error":
    print("Transient storage failure")

# List a delivery's attempts (chronological, oldest first).
attempts = mgmt.deliveries.get_attempts("ws_abc", "del_abc")
for attempt in attempts:
    print(
        attempt["attemptNumber"],
        attempt["status"],
        attempt["responseStatusCode"],
    )

Portal Sessions

session = mgmt.portal_sessions.create("ws_abc", "app_123",
    metadata={"userId": "user-456"},
)
# session["url"]        -> redirect end-user here
# session["code"]       -> one-time exchange code
# session["expiresAt"]  -> expiration timestamp

Error Handling

All SDK errors extend NahookError. Three specific types cover every failure mode:

from nahook import NahookAPIError, NahookNetworkError, NahookTimeoutError

try:
    client.send("ep_abc", {"key": "value"})
except NahookAPIError as err:
    # API returned an error response
    print(err.status)        # 404
    print(err.code)          # "not_found"
    print(str(err))          # "Endpoint not found"
    print(err.retry_after)   # seconds (on 429s)

    # Convenience checks
    err.is_retryable       # True for 5xx, 429
    err.is_auth_error      # True for 401, 403 (token_disabled)
    err.is_not_found       # True for 404
    err.is_rate_limited    # True for 429
    err.is_validation_error  # True for 400
except NahookNetworkError as err:
    print(err.cause)  # original httpx error
except NahookTimeoutError as err:
    print(err.timeout_ms)  # timeout that was exceeded

Webhook Verification

Nahook signs outgoing deliveries using the Standard Webhooks specification. Use the standardwebhooks package to verify incoming webhooks:

pip install standardwebhooks
from standardwebhooks import Webhook

wh = Webhook("whsec_MfKQ9r8GKYqr...")

try:
    payload = wh.verify(request.body, request.headers)
    # Verified and safe to use
except Exception:
    # Invalid signature
    pass

The signing secret (whsec_...) is available in your Nahook Dashboard endpoint settings.


Development

pip install -e ".[dev]"   # install with dev dependencies
pytest                     # run tests

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

nahook-0.1.3.tar.gz (57.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

nahook-0.1.3-py3-none-any.whl (20.4 kB view details)

Uploaded Python 3

File details

Details for the file nahook-0.1.3.tar.gz.

File metadata

  • Download URL: nahook-0.1.3.tar.gz
  • Upload date:
  • Size: 57.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.15

File hashes

Hashes for nahook-0.1.3.tar.gz
Algorithm Hash digest
SHA256 b4625c315be50f09c1e8073f300743d0d13ec12efa3a759fd42726a1ec5c8f4d
MD5 b66b1aab365d17cbf1a79d9e02299405
BLAKE2b-256 8ab1fcf1cb5e5a1f5236ad40af4c40b5305343ed5d8b62a4f1c4c1d8cbf4571a

See more details on using hashes here.

File details

Details for the file nahook-0.1.3-py3-none-any.whl.

File metadata

  • Download URL: nahook-0.1.3-py3-none-any.whl
  • Upload date:
  • Size: 20.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.15

File hashes

Hashes for nahook-0.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 c5e44cce93fc3c054eb216f492f2c60f52be68dad6d4e88b2f036e90313d38f7
MD5 83535c0c2bbf811e242cbce6cbfc0e80
BLAKE2b-256 eaca76ee9e566cf082c5e587981b25e88c9f86fa67f8863f1a0873e39bc01618

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page