Skip to main content

A Python3.7+ port of Apollo Graphql Subscriptions.

Project description

gql-subscriptions

A Python3.7+ port of Apollo Graphql Subscriptions.

This package contains a basic asyncio pubsub system which should be used only in demo, and other pubsub system(like Redis).

Requirements

Python 3.7+

Installation

pip install gql-subscriptions

This package should be used with a network transport, for example starlette-graphql

Getting started with your first subscription

To begin with GraphQL subscriptions, start by defining a GraphQL Subscription type in your schema:

type Subscription {
    somethingChanged: Result
}

type Result {
    id: String
}

Next, add the Subscription type to your schema definition:

schema {
  query: Query
  mutation: Mutation
  subscription: Subscription
}

Now, let's create a simple PubSub instance - it is simple pubsub implementation, based on asyncio.Queue.

from gql_subscriptions import PubSub

pubsub = PubSub()

Now, implement your Subscriptions type resolver, using the pubsub.async_iterator to map the event you need(use python-gql):

from gql_subscriptions import PubSub, subscribe


pubsub = PubSub()

SOMETHING_CHANGED_TOPIC = 'something_changed'


@subscribe
async def something_changed(parent, info):
    return pubsub.async_iterator(SOMETHING_CHANGED_TOPIC)

Now, the GraphQL engine knows that somethingChanged is a subscription, and every time we use pubsub.publish over this topic - it will publish it using the transport we use:

pubsub.publish(SOMETHING_CHANGED_TOPIC, {'somethingChanged': {'id': "123" }})

Note that the default PubSub implementation is intended for demo purposes. It only works if you have a single instance of your server and doesn't scale beyond a couple of connections. For production usage you'll want to use one of the PubSub implementations backed by an external store. (e.g. Redis).

Filters

When publishing data to subscribers, we need to make sure that each subscriber gets only the data it needs.

To do so, we can use with_filter decorator, which wraps the subscription resolver with a filter function, and lets you control each publication for each user.

ResolverFn = Callable[[Any, Any, Dict[str, Any]], Awaitable[AsyncIterator]]
FilterFn = Callable[[Any, Any, Dict[str, Any]], bool]

def with_filter(filter_fn: FilterFn) -> Callable[[ResolverFn], ResolverFn]
    ...

ResolverFn is a async function which returned a typing.AsyncIterator.

async def something_changed(parent, info) -> typing.AsyncIterator

FilterFn is a filter function, executed with the payload(published value), operation info, arugments, and must return bool.

For example, if something_changed would also accept a argument with the ID that is relevant, we can use the following code to filter according to it:

from gql_subscriptions import PubSub, subscribe, with_filter


pubsub = PubSub()

SOMETHING_CHANGED_TOPIC = 'something_changed'


def filter_thing(payload, info, relevant_id):
    return payload['somethingChanged'].get('id') == relevant_id


@subscribe
@with_filter(filter_thing)
async def something_changed(parent, info, relevant_id):
    return pubsub.async_iterator(SOMETHING_CHANGED_TOPIC)

Channels Mapping

You can map multiple channels into the same subscription, for example when there are multiple events that trigger the same subscription in the GraphQL engine.

from gql_subscriptions import PubSub, subscribe, with_filter

pubsub = PubSub()

SOMETHING_UPDATED = 'something_updated'
SOMETHING_CREATED = 'something_created'
SOMETHING_REMOVED = 'something_removed'


@subscribe
async def something_changed(parent, info):
    return pubsub.async_iterator([SOMETHING_UPDATED, SOMETHING_CREATED, SOMETHING_REMOVED])

PubSub Implementations

It can be easily replaced with some other implements of PubSubEngine abstract class.

This package contains a Redis implements.

from gql import subscribe
from gql_subscriptions.pubsubs.redis import RedisPubSub


pubsub = RedisPubSub()

SOMETHING_CHANGED_TOPIC = 'something_changed'


@subscribe
async def something_changed(parent, info):
    return pubsub.async_iterator(SOMETHING_CHANGED_TOPIC)

You can also implement a PubSub of your own, by using the inherit PubSubEngine from this package, this is a Reids example.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

gql-subscriptions-0.0.3.tar.gz (6.7 kB view details)

Uploaded Source

Built Distribution

gql_subscriptions-0.0.3-py3-none-any.whl (7.6 kB view details)

Uploaded Python 3

File details

Details for the file gql-subscriptions-0.0.3.tar.gz.

File metadata

  • Download URL: gql-subscriptions-0.0.3.tar.gz
  • Upload date:
  • Size: 6.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.0.0 CPython/3.7.7 Darwin/19.5.0

File hashes

Hashes for gql-subscriptions-0.0.3.tar.gz
Algorithm Hash digest
SHA256 4ce4c1f7b320fca1d2266632982a721e26b6043331eb3da5db7d285f7d26ee27
MD5 1be7f7ebbd31ee254e6925e268091872
BLAKE2b-256 a9b9b08ae2dd75ea9b99d51342b07ea763d2254508b2b210e860e9efb4c9313a

See more details on using hashes here.

File details

Details for the file gql_subscriptions-0.0.3-py3-none-any.whl.

File metadata

File hashes

Hashes for gql_subscriptions-0.0.3-py3-none-any.whl
Algorithm Hash digest
SHA256 5c57a6273878cb6e23bc16752601a92da3b4bf05b8ca46ec19629dada8eb0e08
MD5 842dc8e0694c3fbe5a88556672571411
BLAKE2b-256 e4f1a9937c23be15f5ca1e6dc12c718c137c2b128a736e88cec70425afa21cde

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page