Python SDK for Inngest
Project description
Serverless event-driven queues, background jobs, and scheduled jobs for Python.
Works with any framework and platform.
Inngest Python SDK
🚧 Currently in beta! It hasn't been battle-tested in production environments yet.
We currently support the following frameworks (but adding a new framework is easy!):
- FastAPI
- Flask
- Tornado
Getting started
Install inngest
in your project:
pip install inngest
Write a basic function and serve it (see the basic example for guidance).
Start the Dev Server (the local version of our cloud platform):
npx inngest-cli@latest dev
Browse to http://127.0.0.1:8288 and you should see your app! Visit our docs to read more about the Dev Server.
Examples
💡 You can mix
async
and non-async
functions in the same app!
Basic (no steps)
This is a minimal example of an Inngest function. It's missing some of our features but it's a good starting point.
import flask
import inngest.flask
import requests
@inngest.create_function(
fn_id="find_person",
trigger=inngest.TriggerEvent(event="app/person.find"),
)
def fetch_person(
ctx: inngest.Context,
step: inngest.StepSync,
) -> dict:
person_id = ctx.event.data["person_id"]
res = requests.get(f"https://swapi.dev/api/people/{person_id}")
return res.json()
app = flask.Flask(__name__)
inngest_client = inngest.Inngest(app_id="flask_example")
# Register functions with the Inngest server
inngest.flask.serve(
app,
inngest_client,
[fetch_person],
)
app.run(port=8000)
Step run
The following example registers a function that will:
- Get the person ID from the event
- Fetch the person with that ID
- Fetch the person's ships
- Return a summary dict
@inngest.create_function(
fn_id="find_ships",
trigger=inngest.TriggerEvent(event="app/ships.find"),
)
def fetch_ships(
ctx: inngest.Context,
step: inngest.StepSync,
) -> dict:
"""
Find all the ships a person has.
"""
person_id = ctx.event.data["person_id"]
def _fetch_person() -> dict:
res = requests.get(f"https://swapi.dev/api/people/{person_id}")
return res.json()
# Wrap the function with step.run to enable retries
person = step.run("fetch_person", _fetch_person)
def _fetch_ship(url: str) -> dict:
res = requests.get(url)
return res.json()
ship_names = []
for ship_url in person["starships"]:
# step.run works in loops!
ship = step.run("fetch_ship", lambda: _fetch_ship(ship_url))
ship_names.append(ship["name"])
return {
"person_name": person["name"],
"ship_names": ship_names,
}
Async function
@inngest.create_function(
fn_id="find_person",
trigger=inngest.TriggerEvent(event="app/person.find"),
)
async def fetch_person(
ctx: inngest.Context,
step: inngest.Step,
) -> dict:
person_id = ctx.event.data["person_id"]
async with httpx.AsyncClient() as client:
res = await client.get(f"https://swapi.dev/api/people/{person_id}")
return res.json()
Sending an event outside a function
Sometimes you want to send an event from a normal, non-Inngest function. You can do that using the client:
import inngest.flask
inngest_client = inngest.Inngest(app_id="flask_example")
inngest_client.send_sync(inngest.Event(name="app/test", data={"person_id": 1}))
If you prefer async
then use the send
method instead:
import asyncio
import inngest.flask
inngest_client = inngest.Inngest(app_id="flask_example")
async def main():
await inngest_client.send(inngest.Event(name="app/test", data={"person_id": 1}))
asyncio.run(main())
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for inngest-0.2.1a1-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 389c41c8d374e3fcb4475794de3257416c7ce76b86af0a3799f393c1c89e942b |
|
MD5 | 00eadf51398a55efc334053843e31171 |
|
BLAKE2b-256 | 55f43e3a01070f187feb6428b7a47ae1b6ed2782975be3cec8b412906dc6c40d |