Small aiohttp server application for TLE storage
Project description
TLEStorageService
Small aiohttp server application for TLE storage
Requirements
- aio-nasa-tle-loader >= 1.0.0
- aio-space-track-api >= 2.1.1
- aiohttp >= 2.1.0
- aiopg >= 0.13.0
- alembic >= 0.9.2
- pyaml >=16.12.2
- sqlalchemy >= 1.1.10
- ujson >= 1.35
Installing
pip install tle-storage-service
Getting started
Start aiohttp application:
#!/usr/bin/env python3 # -*- coding: utf-8 -*- import logging import os from aiohttp import web from tle_storage_service import TleStorageService, load_cfg, LOG_FORMAT DIR = os.path.abspath(os.path.dirname(__file__)) if __name__ == '__main__': logging.basicConfig(level=logging.INFO, format=LOG_FORMAT) app = TleStorageService(config=load_cfg(path=os.path.join(DIR, 'config', 'server.yaml'))) web.run_app(app, port=8080)
Create periodic script for refresh db
#!/usr/bin/env python3 # -*- coding: utf-8 -*- import asyncio import logging import os from aio_nasa_tle_loader import AsyncNasaTLELoader from aio_space_track_api import AsyncSpaceTrackApi from aiopg.sa import create_engine from tle_storage_service import periodic_task, load_cfg, nasa, space_track, LOG_FORMAT DIR = os.path.abspath(os.path.dirname(__file__)) CFG = load_cfg(path=os.path.join(DIR, 'config', 'periodic.yaml')) logger = logging.getLogger(__name__) @periodic_task(CFG['sync'].get('space-track', 4) * 3600) async def call_space_track(engine, api_params, satellites, loop): async with AsyncSpaceTrackApi(loop=loop, **api_params) as api: await space_track(engine, api, satellites) @periodic_task(CFG['sync'].get('nasa', 0.5) * 3600) async def call_nasa(engine, loop): async with AsyncNasaTLELoader(loop=loop) as loader: await nasa(engine, loader) async def run(db_params, api_params, satellites, loop): async with create_engine(loop=loop, **db_params) as engine: await asyncio.gather( call_nasa(engine, loop), call_space_track(engine, api_params, satellites, loop), loop=loop ) def notify_callback(msg): logger.info('[%s] Receive msg <- "%s"', msg.channel, msg.payload) if __name__ == '__main__': logging.basicConfig(format=LOG_FORMAT, level=logging.DEBUG) loop = asyncio.get_event_loop() loop.create_task(run(CFG['db'], CFG['space-track'], CFG['sync']['satellites'], loop)) try: loop.run_forever() finally: loop.close()
More code examples into examples directory
Source code
The latest developer version is available in a github repository: https://github.com/nkoshell/tle-storage-service
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Filename, size | File type | Python version | Upload date | Hashes |
---|---|---|---|---|
Filename, size tle_storage_service-1.0.5-py3-none-any.whl (17.5 kB) | File type Wheel | Python version py3 | Upload date | Hashes View |
Filename, size tle-storage-service-1.0.5.tar.gz (14.8 kB) | File type Source | Python version None | Upload date | Hashes View |
Close
Hashes for tle_storage_service-1.0.5-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | cb7876bb29745cc0bd7ad3ca4025b3c829bb06f6bd1ff13d29a0db00f7907bc4 |
|
MD5 | 1398edfb20aaa1e3e9997b6e190b792f |
|
BLAKE2-256 | 083af2e081d61a01ffafbd67df9d03ef90786e39f3cc80bda970fc88d06a4952 |
Close
Hashes for tle-storage-service-1.0.5.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 4926f32894a8c824f86081b86f132135be75c0006bbdda89518929eb9335a142 |
|
MD5 | 23432d45042cb606634853c8aa6e5b40 |
|
BLAKE2-256 | 359d0bada2d321301f953c0e0aa42f1eb53883e37782d2f2ed4b372fdc4166ef |