Skip to main content

Tinkoff Invest

Project description

T-Invest

pip install tinvest
import asyncio

import tinvest

TOKEN = "<TOKEN>"

events = tinvest.StreamingEvents()


@events.candle()
async def handle_candle(
    api: tinvest.StreamingApi, payload: tinvest.CandleStreamingSchema
):
    print(payload)


@events.orderbook()
async def handle_orderbook(
    api: tinvest.StreamingApi, payload: tinvest.OrderbookStreamingSchema
):
    print(payload)


@events.instrument_info()
async def handle_instrument_info(
    api: tinvest.StreamingApi, payload: tinvest.InstrumentInfoStreamingSchema
):
    print(payload)


@events.error()
async def handle_error(
    api: tinvest.StreamingApi, payload: tinvest.ErrorStreamingSchema
):
    print(payload)


@events.startup()
async def startup(api: tinvest.StreamingApi):
    await api.candle.subscribe("BBG0013HGFT4", "1min")
    await api.orderbook.subscribe("BBG0013HGFT4", 5, "123ASD1123")
    await api.instrument_info.subscribe("BBG0013HGFT4")


@events.cleanup()
async def cleanup(api: tinvest.StreamingApi):
    await api.candle.unsubscribe("BBG0013HGFT4", "1min")
    await api.orderbook.unsubscribe("BBG0013HGFT4", 5)
    await api.instrument_info.unsubscribe("BBG0013HGFT4")


async def main():
    await tinvest.Streaming(TOKEN, state={"postgres": ...}).add_handlers(events).run()


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        pass
import tinvest

TOKEN = "<TOKEN>"

client = tinvest.SyncClient(TOKEN)
api = tinvest.PortfolioApi(client)

response = api.portfolio_get()  # requests.Response
print(response.parse_json())  # tinvest.PortfolioResponse
# Handle error
...
api = tinvest.OperationsApi(client)

response = api.operations_get("", "", raise_for_status=False)
print(response.parse_error())  # tinvest.Error
import asyncio
import tinvest

TOKEN = "<TOKEN>"

client = tinvest.AsyncClient(TOKEN)
api = tinvest.PortfolioApi(client)


async def request():
    async with api.portfolio_get() as response:  # aiohttp.ClientResponse
        print(await response.parse_json())  # tinvest.PortfolioResponse


loop = asyncio.get_event_loop()
loop.run_until_complete(request())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

tinvest-1.0.8.tar.gz (9.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

tinvest-1.0.8-py3-none-any.whl (10.7 kB view details)

Uploaded Python 3

File details

Details for the file tinvest-1.0.8.tar.gz.

File metadata

  • Download URL: tinvest-1.0.8.tar.gz
  • Upload date:
  • Size: 9.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.0.0 CPython/3.8.0 Windows/10

File hashes

Hashes for tinvest-1.0.8.tar.gz
Algorithm Hash digest
SHA256 be471e02963e50788e43af95a899f03ee5454821212fd3a670909e995952de3f
MD5 c73e31f368fa9fae92a37c7e93dbeaac
BLAKE2b-256 84446a8bac6576121b20de0e8ff875284d1c562f04dfc5e77d0239c5fa7577cd

See more details on using hashes here.

File details

Details for the file tinvest-1.0.8-py3-none-any.whl.

File metadata

  • Download URL: tinvest-1.0.8-py3-none-any.whl
  • Upload date:
  • Size: 10.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.0.0 CPython/3.8.0 Windows/10

File hashes

Hashes for tinvest-1.0.8-py3-none-any.whl
Algorithm Hash digest
SHA256 c03f3ce72af036254a12d60f227066a7087882558e87f46ed7a9e22d406687a0
MD5 c9ac3978c3715229f99004a105ef0819
BLAKE2b-256 c4a6faa635cdb4946a7692ff711ee60a82f38adb251fc5d0f3242f29ce90996e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page