Skip to main content

Exchange one normal asynchronous websocket connection to unlimited number of data streams.

Project description

AsyncWebsocketStreamInterface

AsyncWebsocketStreamInterface is an abstract class interface to be inherited to implement a certain asynchronous websocket client class.

We will take class BinanceFapiAsyncWs from repository BinanceAsyncWebsocket as example to demonstrate.


Install · Inherit· Usage ·


Install

AsyncWebsocketStreamInterface in PyPI

pip install AsyncWebsocketStreamInterface

Inherit

There are 3 abstract methods in class AsyncWebsocketStreamInterface which is enforced to be inherited: _create_ws, _ when2create_new_ws, _parse_raw_data. In BinanceFapiAsyncWs, we implement more than these 3 mainly for these reasons: this websocket is private thus it needs a apiKey, and it needs to send restful request for a listenkey and to send heartbeats regularly to maintain a stable connection. So these relative implementation will not be discussed in detail.

_create_ws

async def _create_ws(self):
    ws = await websockets.connect(self.ws_baseurl + '/ws/' + await self._generate_listenkey())
    return ws

This is intuitive. You would have to create and return a websockets client. For more information about module websockets, refer to https://pypi.org/project/websockets/.

_when2create_new_ws

async def _when2create_new_ws(self):
    listenKeyExpired_stream = self.stream_filter([{'e': 'listenKeyExpired'}])

    async def read_listenKeyExpired_stream(listenKeyExpired_stream):
        async for news in listenKeyExpired_stream:
            try:
                return
            finally:
                asyncio.create_task(listenKeyExpired_stream.close())

    read_listenKeyExpired_stream_task = asyncio.create_task(read_listenKeyExpired_stream(listenKeyExpired_stream))
    # 20小时更新连接一次,或者服务端推送消息listenKey过期
    await asyncio.create_task(
        asyncio.wait(
            [read_listenKeyExpired_stream_task, asyncio.sleep(20 * 3600)],
            return_when='FIRST_COMPLETED'))
    logger.debug('Time to update ws connection.')

This method decides when to create and update new websockets client. Every time when a websockets client is created, this method is called, and the coroutine is awaited. The coroutine is pending while the corresponding websockets client is alive. When the coroutine is done, the corresponding websockets client is closed and abandoned, and a new one is created.

In our example, 2 coroutines(task) is awaited -- "read_listenKeyExpired_stream_task" and "asyncio.sleep(20 * 3600)", and the first completing one will trigger coroutine _when2create_new_ws set done, and the websockets client updated.

_parse_raw_data

async def _parse_raw_data(self, raw_data):
    msg = json.loads(raw_data)
    return msg

This method parse the raw data from the server to some format you wish.

In our simple example, we parse the json string to the data of a certain type.

Usage

After inheriting abstract methods, we would look to how to use our inheriting class. You may have already noticed that we have inherited 3 'private' methods, which we might not even find them in our editor hint. In fact, they are not methods which you will call in your codes.

stream_filter(_filters: list = None)

Transfer one normal asynchronous websocket connection to unlimited number of data streams. stream_filter returns an asynchronous iterator. Whenever you want to set a coroutine to watch the websocket data, you could create a data stream:

async def watcher():
    stream = ws.stream_filter()
    async for news in stream:
        print(news)

Remember that you'd better explicitly close the stream when it is no longer used:

close_task = asyncio.create_task(stream.close())
...
await close_task

Parameter _filters is a list of dictionaries. If pairs of key and value of any dictionary could all be matched by some message, then the message would be filtered, and the asynchronous iterator to be returned will generate only these filtered message. For example,

stream_filter([{'a': 1}, {'b': 2, "c": 3}])

Message

{'a': 1, 'c': 2}

and

{'b': 2, 'c': 3, 'd': 4}

would be filtered. However

{'a': 2, 'c': 2}

or

{'b': 2, 'c': 4, 'd': 4}

would not.

present_ws

This property returns the current alive websockets client.

send(msg)

Send a message to the websocket server.

exit()

Gracefully exit the instance of the websocket client class.

add_handler(function_or_coroutine_function)

Add a function or coroutine function to handle the new message. This handler takes the message as the only parameter. For example:

def f(msg):
    print(msg)


async def f2(msg):
    print(msg)    

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

AsyncWebsocketStreamInterface-0.9.8.tar.gz (5.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

AsyncWebsocketStreamInterface-0.9.8-py3-none-any.whl (6.8 kB view details)

Uploaded Python 3

File details

Details for the file AsyncWebsocketStreamInterface-0.9.8.tar.gz.

File metadata

  • Download URL: AsyncWebsocketStreamInterface-0.9.8.tar.gz
  • Upload date:
  • Size: 5.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.25.1 setuptools/51.0.0.post20201207 requests-toolbelt/0.9.1 tqdm/4.55.1 CPython/3.7.7

File hashes

Hashes for AsyncWebsocketStreamInterface-0.9.8.tar.gz
Algorithm Hash digest
SHA256 f7e918ececf84135e22c843e1f64e49f96f5bc43750b45ed543696ab71a0c5c6
MD5 f800daf5e6a1e12717ad58abe26e6fb7
BLAKE2b-256 ae09a23113c5b134a7dd4a654ad23d34d4e169df165ba1603d6f308e393d9fe4

See more details on using hashes here.

File details

Details for the file AsyncWebsocketStreamInterface-0.9.8-py3-none-any.whl.

File metadata

  • Download URL: AsyncWebsocketStreamInterface-0.9.8-py3-none-any.whl
  • Upload date:
  • Size: 6.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.25.1 setuptools/51.0.0.post20201207 requests-toolbelt/0.9.1 tqdm/4.55.1 CPython/3.7.7

File hashes

Hashes for AsyncWebsocketStreamInterface-0.9.8-py3-none-any.whl
Algorithm Hash digest
SHA256 ec30fae0d2497b571a31f37552c7a38971f228429b2dbd8ba6fdbd783e48c3cf
MD5 817a5c35a225c6368330e408dfb9aae7
BLAKE2b-256 6c7fe5c877fa58d24d7bf6c515c9a631fd57c8f0a7c7f314efd55fe5150a9429

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page