Skip to main content

Official Python SDK for the Nahook webhook platform

Project description

nahook

Official Python SDK for the Nahook webhook platform.

Two classes, one package:

Class Purpose Auth
NahookClient Send and trigger webhook events API key (nhk_us_...)
NahookManagement Manage endpoints, event types, apps Management token (nhm_...)

Requirements

  • Python 3.9+
  • httpx (installed automatically)

Installation

pip install nahook

NahookClient

Send webhooks to specific endpoints or fan-out by event type.

Setup

from nahook import NahookClient

# Simple
client = NahookClient("nhk_us_...")

# With options
client = NahookClient("nhk_us_...", retries=3, timeout=5_000)
# retries: default 0 (no retries)
# timeout: default 30_000ms

Configuration

The SDK automatically routes requests to the correct regional API based on your API key prefix (nhk_us_... -> US, nhk_eu_... -> EU, nhk_ap_... -> Asia Pacific). No configuration needed.

To override the base URL (for testing or local development):

client = NahookClient("nhk_us_...", base_url="http://localhost:3001")

For unit tests, mock the SDK client at the dependency injection boundary. For integration tests, override the base URL to point at a local server.

Graceful shutdown — with statement and close()

NahookClient and NahookManagement are context managers — the idiomatic Python pattern is with, which closes the underlying httpx.Client's connection pool on exit:

with NahookClient("nhk_us_...") as client:
    client.send("ep_abc123", payload={"orderId": "123", "status": "paid"})
# httpx.Client.close() automatically called here

If with doesn't fit your lifecycle (e.g., a long-lived module-scope client), call close() explicitly during graceful shutdown:

import atexit

client = NahookClient("nhk_us_...")
atexit.register(client.close)

# ... use client across the process lifetime ...

close() drains pooled connections and is safe to call multiple times. The same pattern works on NahookManagement. Skipping close() is fine in short-lived scripts — the OS reaps sockets on process exit — but matters for test harnesses, graceful shutdown handlers, or any process that recycles clients during its lifetime.

Send to a specific endpoint

result = client.send("ep_abc123", {
    "orderId": "123",
    "status": "paid",
}, idempotency_key="order-123-paid")  # optional, auto-generated UUID if omitted

# {"deliveryId": "del_...", "idempotencyKey": "order-123-paid", "status": "accepted"}

Fan-out by event type

result = client.trigger("order.paid", {
    "orderId": "123",
    "status": "paid",
}, metadata={"region": "us-east-1"})  # optional

# {"eventTypeId": "evt_...", "deliveryIds": ["del_..."], "status": "accepted"}

Batch operations

# Send to multiple endpoints (max 20 items)
batch = client.send_batch([
    {"endpointId": "ep_abc", "payload": {"orderId": "123"}},
    {"endpointId": "ep_def", "payload": {"orderId": "456"}},
])

# Fan-out multiple event types (max 20 items)
fan_out = client.trigger_batch([
    {"eventType": "order.paid", "payload": {"orderId": "123"}},
    {"eventType": "order.shipped", "payload": {"orderId": "456"}},
])

# Results: 202 (all succeed) or 207 (mixed)
for item in batch["items"]:
    if "error" in item:
        print(f"Item {item['index']} failed: {item['error']['code']}")

Retry behavior

Retries are opt-in via the retries constructor parameter. When enabled:

  • Strategy: Exponential backoff with full jitter
  • Delays: 500ms base, 10s max
  • Retryable: 5xx, 429 (respects Retry-After), network errors, timeouts
  • Non-retryable: 400, 401, 403, 404, 409, 413
  • Safe by design: Idempotency keys are always sent, making retries safe

NahookManagement

Programmatically manage your Nahook workspace resources.

Setup

from nahook import NahookManagement

# Simple
mgmt = NahookManagement("nhm_...")

# With options
mgmt = NahookManagement("nhm_...", timeout=10_000)
# Note: retries are not supported for management calls

Endpoints

result = mgmt.endpoints.list("ws_abc")
endpoints = result["data"]

endpoint = mgmt.endpoints.create("ws_abc",
    url="https://example.com/webhooks",
    description="Production webhook",
    type_="webhook",  # "webhook" | "slack"
)

endpoint = mgmt.endpoints.get("ws_abc", "ep_123")

mgmt.endpoints.update("ws_abc", "ep_123",
    description="Updated",
    is_active=False,
)

mgmt.endpoints.delete("ws_abc", "ep_123")

Event Types

result = mgmt.event_types.list("ws_abc")

event_type = mgmt.event_types.create("ws_abc",
    name="order.paid",
    description="Fired when an order is paid",
)

event_type = mgmt.event_types.get("ws_abc", "evt_123")

mgmt.event_types.update("ws_abc", "evt_123",
    description="Updated description",
)

mgmt.event_types.delete("ws_abc", "evt_123")

Applications

result = mgmt.applications.list("ws_abc", limit=50, offset=0)

app = mgmt.applications.create("ws_abc",
    name="Acme Corp",
    external_id="acme-123",
    metadata={"tier": "pro"},
)

app = mgmt.applications.get("ws_abc", "app_123")

mgmt.applications.update("ws_abc", "app_123", name="Acme Inc")

mgmt.applications.delete("ws_abc", "app_123")

# Endpoints scoped to an application
result = mgmt.applications.list_endpoints("ws_abc", "app_123")
ep = mgmt.applications.create_endpoint("ws_abc", "app_123",
    url="https://acme.com/webhooks",
)

Subscriptions

result = mgmt.subscriptions.list("ws_abc", "ep_123")

mgmt.subscriptions.create("ws_abc", "ep_123", event_type_ids=["evt_456"])

mgmt.subscriptions.delete("ws_abc", "ep_123", "evt_456")

Environments

result = mgmt.environments.list("ws_abc")

env = mgmt.environments.create("ws_abc",
    name="Staging",
    slug="staging",
)

env = mgmt.environments.get("ws_abc", "env_123")

mgmt.environments.update("ws_abc", "env_123", name="Pre-production")

mgmt.environments.delete("ws_abc", "env_123")

Event Type Visibility

Control which event types are visible per environment.

result = mgmt.environments.list_event_type_visibility("ws_abc", "env_123")

vis = mgmt.environments.set_event_type_visibility("ws_abc", "env_123", "evt_456",
    published=True,
)
# {"eventTypeId": "evt_456", "eventTypeName": "order.paid", "published": True}

Deliveries

Read access to a workspace's webhook deliveries — list, fetch, and inspect attempts. There is no create/update/delete on this resource.

# Page through an endpoint's deliveries (newest-first).
result = mgmt.deliveries.list("ws_abc", "ep_123", limit=50)
for delivery in result["data"]:
    print(delivery["id"], delivery["status"])

# next_cursor is opaque — pass it through verbatim to fetch the next page.
# It is None when there are no more pages.
if result["next_cursor"] is not None:
    next_page = mgmt.deliveries.list(
        "ws_abc", "ep_123", limit=50, cursor=result["next_cursor"]
    )

# Filter by status.
failed = mgmt.deliveries.list("ws_abc", "ep_123", status="failed")

# Fetch a single delivery's metadata.
delivery = mgmt.deliveries.get("ws_abc", "del_abc")
print(delivery["status"], delivery["totalAttempts"])

# Fetch with the stored payload envelope. The envelope's ``status`` carries
# the access-level reality — only ``"available"`` payloads include ``data``.
delivery = mgmt.deliveries.get("ws_abc", "del_abc", include_payload=True)
envelope = delivery["payload"]
if envelope["status"] == "available":
    print(envelope["data"], envelope["contentType"])
elif envelope["status"] == "forbidden":
    print("Workspace plan does not include payload storage")
elif envelope["status"] == "processing":
    print("Delivery still in flight — try again shortly")
elif envelope["status"] == "not_found":
    print("No stored payload for this delivery")
elif envelope["status"] == "error":
    print("Transient storage failure")

# List a delivery's attempts (chronological, oldest first).
attempts = mgmt.deliveries.get_attempts("ws_abc", "del_abc")
for attempt in attempts:
    print(
        attempt["attemptNumber"],
        attempt["status"],
        attempt["responseStatusCode"],
    )

Portal Sessions

session = mgmt.portal_sessions.create("ws_abc", "app_123",
    metadata={"userId": "user-456"},
)
# session["url"]        -> redirect end-user here
# session["code"]       -> one-time exchange code
# session["expiresAt"]  -> expiration timestamp

Error Handling

All SDK errors extend NahookError. Three specific types cover every failure mode:

from nahook import NahookAPIError, NahookNetworkError, NahookTimeoutError

try:
    client.send("ep_abc", {"key": "value"})
except NahookAPIError as err:
    # API returned an error response
    print(err.status)        # 404
    print(err.code)          # "not_found"
    print(str(err))          # "Endpoint not found"
    print(err.retry_after)   # seconds (on 429s)

    # Convenience checks
    err.is_retryable       # True for 5xx, 429
    err.is_auth_error      # True for 401, 403 (token_disabled)
    err.is_not_found       # True for 404
    err.is_rate_limited    # True for 429
    err.is_validation_error  # True for 400
except NahookNetworkError as err:
    print(err.cause)  # original httpx error
except NahookTimeoutError as err:
    print(err.timeout_ms)  # timeout that was exceeded

Webhook Verification

Nahook signs outgoing deliveries using the Standard Webhooks specification. Use the standardwebhooks package to verify incoming webhooks:

pip install standardwebhooks
from standardwebhooks import Webhook

wh = Webhook("whsec_MfKQ9r8GKYqr...")

try:
    payload = wh.verify(request.body, request.headers)
    # Verified and safe to use
except Exception:
    # Invalid signature
    pass

The signing secret (whsec_...) is available in your Nahook Dashboard endpoint settings.


Development

pip install -e ".[dev]"   # install with dev dependencies
pytest                     # run tests

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

nahook-0.1.4.tar.gz (57.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

nahook-0.1.4-py3-none-any.whl (20.4 kB view details)

Uploaded Python 3

File details

Details for the file nahook-0.1.4.tar.gz.

File metadata

  • Download URL: nahook-0.1.4.tar.gz
  • Upload date:
  • Size: 57.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.15

File hashes

Hashes for nahook-0.1.4.tar.gz
Algorithm Hash digest
SHA256 04f577f52cd2bc55f67cedc35f8836d81ab0208de2f5d2380e1f4aed5d4c9a0a
MD5 f334eea7c54a6889242c391953ae6786
BLAKE2b-256 2ce3c848eccba1210d5426f32f3c198040dd1496f9c890d3b1cf5d5eb52b7bbd

See more details on using hashes here.

File details

Details for the file nahook-0.1.4-py3-none-any.whl.

File metadata

  • Download URL: nahook-0.1.4-py3-none-any.whl
  • Upload date:
  • Size: 20.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.15

File hashes

Hashes for nahook-0.1.4-py3-none-any.whl
Algorithm Hash digest
SHA256 0b41bb118775a476c85305b498a0505aef39470104dc29bd454018e2003fd609
MD5 a9c9454c3cdd97543cf0605c0d486638
BLAKE2b-256 9afc646b54e92950386804854cc7d1a887f62e07693ea66c6b257a26a696ee10

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page