Skip to main content

Tinkoff Invest

Project description

T-Invest

Build Status PyPI PyPI - Python Version Codecov GitHub last commit Tinvest

pip install tinvest
import asyncio

import tinvest

TOKEN = "<TOKEN>"

events = tinvest.StreamingEvents()


@events.candle()
async def handle_candle(
    api: tinvest.StreamingApi, payload: tinvest.CandleStreamingSchema
):
    print(payload)


@events.orderbook()
async def handle_orderbook(
    api: tinvest.StreamingApi, payload: tinvest.OrderbookStreamingSchema
):
    print(payload)


@events.instrument_info()
async def handle_instrument_info(
    api: tinvest.StreamingApi, payload: tinvest.InstrumentInfoStreamingSchema
):
    print(payload)


@events.error()
async def handle_error(
    api: tinvest.StreamingApi, payload: tinvest.ErrorStreamingSchema
):
    print(payload)


@events.startup()
async def startup(api: tinvest.StreamingApi):
    await api.candle.subscribe("BBG0013HGFT4", "1min")
    await api.orderbook.subscribe("BBG0013HGFT4", 5, "123ASD1123")
    await api.instrument_info.subscribe("BBG0013HGFT4")


@events.cleanup()
async def cleanup(api: tinvest.StreamingApi):
    await api.candle.unsubscribe("BBG0013HGFT4", "1min")
    await api.orderbook.unsubscribe("BBG0013HGFT4", 5)
    await api.instrument_info.unsubscribe("BBG0013HGFT4")


async def main():
    await tinvest.Streaming(TOKEN, state={"postgres": ...}).add_handlers(events).run()


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        pass
import tinvest

TOKEN = "<TOKEN>"

client = tinvest.SyncClient(TOKEN)
api = tinvest.PortfolioApi(client)

response = api.portfolio_get()  # requests.Response
print(response.parse_json())  # tinvest.PortfolioResponse
# Handle error
...
api = tinvest.OperationsApi(client)

response = api.operations_get("", "")
print(response.parse_error())  # tinvest.Error
import asyncio
import tinvest

TOKEN = "<TOKEN>"

client = tinvest.AsyncClient(TOKEN)
api = tinvest.PortfolioApi(client)


async def request():
    async with api.portfolio_get() as response:  # aiohttp.ClientResponse
        print(await response.parse_json())  # tinvest.PortfolioResponse


loop = asyncio.get_event_loop()
loop.run_until_complete(request())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

tinvest-1.0.12.tar.gz (9.9 kB view details)

Uploaded Source

Built Distribution

tinvest-1.0.12-py3-none-any.whl (11.2 kB view details)

Uploaded Python 3

File details

Details for the file tinvest-1.0.12.tar.gz.

File metadata

  • Download URL: tinvest-1.0.12.tar.gz
  • Upload date:
  • Size: 9.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.0.2 CPython/3.7.1 Linux/4.15.0-1028-gcp

File hashes

Hashes for tinvest-1.0.12.tar.gz
Algorithm Hash digest
SHA256 5abec61acf78faa0ec35892b6d575a0f54e162e34c23a8350fbbc2d3284d47ac
MD5 21c3667b7dd0f2ce9c50f9b7a8d11beb
BLAKE2b-256 b6d25a9fc39065cd58521a85986ef9efc4cdd83f564429e360f161b4c314193b

See more details on using hashes here.

File details

Details for the file tinvest-1.0.12-py3-none-any.whl.

File metadata

  • Download URL: tinvest-1.0.12-py3-none-any.whl
  • Upload date:
  • Size: 11.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.0.2 CPython/3.7.1 Linux/4.15.0-1028-gcp

File hashes

Hashes for tinvest-1.0.12-py3-none-any.whl
Algorithm Hash digest
SHA256 66de9128b3f78362754865317e46c5ae741a74398ca33c81c3abac046b494f42
MD5 d20c3cdd535f2c5285d76eaec3a2c6a8
BLAKE2b-256 e2ef64192fcba644e4c368d66126c12ff9b5f6d987bf7c22c93340de26defb9b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page