A Python3.7+ port of Apollo Graphql Subscriptions.
Project description
gql-subscriptions
A Python3.7+ port of Apollo Graphql Subscriptions.
This package contains a basic asyncio pubsub system which should be used only in demo, and other pubsub system(like Redis).
Requirements
Python 3.7+
Installation
pip install gql-subscriptions
This package should be used with a network transport, for example starlette-graphql
Getting started with your first subscription
To begin with GraphQL subscriptions, start by defining a GraphQL Subscription type in your schema:
type Subscription {
somethingChanged: Result
}
type Result {
id: String
}
Next, add the Subscription type to your schema definition:
schema {
query: Query
mutation: Mutation
subscription: Subscription
}
Now, let's create a simple PubSub
instance - it is simple pubsub implementation, based on asyncio.Queue
.
from gql_subscriptions import PubSub
pubsub = PubSub()
Now, implement your Subscriptions type resolver, using the pubsub.async_iterator
to map the event you need(use python-gql):
from gql_subscriptions import PubSub, subscribe
pubsub = PubSub()
SOMETHING_CHANGED_TOPIC = 'something_changed'
@subscribe
async def something_changed(parent, info):
return pubsub.async_iterator(SOMETHING_CHANGED_TOPIC)
Now, the GraphQL engine knows that somethingChanged
is a subscription, and every time we use pubsub.publish over this topic - it will publish it using the transport we use:
pubsub.publish(SOMETHING_CHANGED_TOPIC, {'somethingChanged': {'id': "123" }})
Note that the default PubSub implementation is intended for demo purposes. It only works if you have a single instance of your server and doesn't scale beyond a couple of connections. For production usage you'll want to use one of the PubSub implementations backed by an external store. (e.g. Redis).
Filters
When publishing data to subscribers, we need to make sure that each subscriber gets only the data it needs.
To do so, we can use with_filter
decorator, which wraps the subscription resolver
with a filter function, and lets you control each publication for each user.
ResolverFn = Callable[[Any, Any, Dict[str, Any]], Awaitable[AsyncIterator]]
FilterFn = Callable[[Any, Any, Dict[str, Any]], bool]
def with_filter(filter_fn: FilterFn) -> Callable[[ResolverFn], ResolverFn]
...
ResolverFn
is a async function which returned a typing.AsyncIterator
.
async def something_changed(parent, info) -> typing.AsyncIterator
FilterFn
is a filter function, executed with the payload(published value), operation info, arugments, and must return bool.
For example, if something_changed
would also accept a argument with the ID that is relevant, we can use the following code to filter according to it:
from gql_subscriptions import PubSub, subscribe, with_filter
pubsub = PubSub()
SOMETHING_CHANGED_TOPIC = 'something_changed'
def filter_thing(payload, info, relevant_id):
return payload['somethingChanged'].get('id') == relevant_id
@subscribe
@with_filter(filter_thing)
async def something_changed(parent, info, relevant_id):
return pubsub.async_iterator(SOMETHING_CHANGED_TOPIC)
Channels Mapping
You can map multiple channels into the same subscription, for example when there are multiple events that trigger the same subscription in the GraphQL engine.
from gql_subscriptions import PubSub, subscribe, with_filter
pubsub = PubSub()
SOMETHING_UPDATED = 'something_updated'
SOMETHING_CREATED = 'something_created'
SOMETHING_REMOVED = 'something_removed'
@subscribe
async def something_changed(parent, info):
return pubsub.async_iterator([SOMETHING_UPDATED, SOMETHING_CREATED, SOMETHING_REMOVED])
PubSub Implementations
It can be easily replaced with some other implements of PubSubEngine abstract class.
This package contains a Redis
implements.
from gql import subscribe
from gql_subscriptions.pubsubs.redis import RedisPubSub
pubsub = RedisPubSub()
SOMETHING_CHANGED_TOPIC = 'something_changed'
@subscribe
async def something_changed(parent, info):
return pubsub.async_iterator(SOMETHING_CHANGED_TOPIC)
You can also implement a PubSub
of your own, by using the inherit PubSubEngine
from this package, this is a Reids example.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file gql-subscriptions-0.0.3.tar.gz
.
File metadata
- Download URL: gql-subscriptions-0.0.3.tar.gz
- Upload date:
- Size: 6.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.0.0 CPython/3.7.7 Darwin/19.5.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 4ce4c1f7b320fca1d2266632982a721e26b6043331eb3da5db7d285f7d26ee27 |
|
MD5 | 1be7f7ebbd31ee254e6925e268091872 |
|
BLAKE2b-256 | a9b9b08ae2dd75ea9b99d51342b07ea763d2254508b2b210e860e9efb4c9313a |
File details
Details for the file gql_subscriptions-0.0.3-py3-none-any.whl
.
File metadata
- Download URL: gql_subscriptions-0.0.3-py3-none-any.whl
- Upload date:
- Size: 7.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.0.0 CPython/3.7.7 Darwin/19.5.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 5c57a6273878cb6e23bc16752601a92da3b4bf05b8ca46ec19629dada8eb0e08 |
|
MD5 | 842dc8e0694c3fbe5a88556672571411 |
|
BLAKE2b-256 | e4f1a9937c23be15f5ca1e6dc12c718c137c2b128a736e88cec70425afa21cde |