Small aiohttp server application for TLE storage
Project description
TLEStorageService
Small aiohttp server application for TLE storage
Requirements
- aio-nasa-tle-loader >= 1.0.0
- aio-space-track-api >= 2.1.1
- aiohttp >= 2.1.0
- aiopg >= 0.13.0
- alembic >= 0.9.2
- pyaml >=16.12.2
- sqlalchemy >= 1.1.10
- ujson >= 1.35
Installing
pip install tle-storage-service
Getting started
Start aiohttp application:
#!/usr/bin/env python3 # -*- coding: utf-8 -*- import logging import os from aiohttp import web from tle_storage_service import TleStorageService, load_cfg, LOG_FORMAT DIR = os.path.abspath(os.path.dirname(__file__)) if __name__ == '__main__': logging.basicConfig(level=logging.INFO, format=LOG_FORMAT) app = TleStorageService(config=load_cfg(path=os.path.join(DIR, 'config', 'server.yaml'))) web.run_app(app, port=8080)
Create periodic script for refresh db
#!/usr/bin/env python3 # -*- coding: utf-8 -*- import asyncio import logging import os from aio_nasa_tle_loader import AsyncNasaTLELoader from aio_space_track_api import AsyncSpaceTrackApi from aiopg.sa import create_engine from tle_storage_service import periodic_task, load_cfg, nasa, space_track, LOG_FORMAT DIR = os.path.abspath(os.path.dirname(__file__)) CFG = load_cfg(path=os.path.join(DIR, 'config', 'periodic.yaml')) logger = logging.getLogger(__name__) @periodic_task(CFG['sync'].get('space-track', 4) * 3600) async def call_space_track(engine, api_params, satellites, loop): async with AsyncSpaceTrackApi(loop=loop, **api_params) as api: await space_track(engine, api, satellites) @periodic_task(CFG['sync'].get('nasa', 0.5) * 3600) async def call_nasa(engine, loop): async with AsyncNasaTLELoader(loop=loop) as loader: await nasa(engine, loader) async def run(db_params, api_params, satellites, loop): async with create_engine(loop=loop, **db_params) as engine: await asyncio.gather( call_nasa(engine, loop), call_space_track(engine, api_params, satellites, loop), loop=loop ) def notify_callback(msg): logger.info('[%s] Receive msg <- "%s"', msg.channel, msg.payload) if __name__ == '__main__': logging.basicConfig(format=LOG_FORMAT, level=logging.DEBUG) loop = asyncio.get_event_loop() loop.create_task(run(CFG['db'], CFG['space-track'], CFG['sync']['satellites'], loop)) try: loop.run_forever() finally: loop.close()
More code examples into examples directory
Source code
The latest developer version is available in a github repository: https://github.com/nkoshell/tle-storage-service
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Filename, size | File type | Python version | Upload date | Hashes |
---|---|---|---|---|
Filename, size tle_storage_service-1.0.7-py3-none-any.whl (17.6 kB) | File type Wheel | Python version py3 | Upload date | Hashes View |
Filename, size tle-storage-service-1.0.7.tar.gz (14.8 kB) | File type Source | Python version None | Upload date | Hashes View |
Close
Hashes for tle_storage_service-1.0.7-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | c871071927f620dc93c6e1d6844ef401824dd8fde3fdcd0e84d9172e43b41a8d |
|
MD5 | 3151e47d2fb12bcad0307dff17b9ad93 |
|
BLAKE2-256 | 0d97776a6d152edf72d3588c01cbeca922f941d61d14efd87ea7f874e1fc83be |
Close
Hashes for tle-storage-service-1.0.7.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 32a92e0bb8d8928f956e7cf18667f888f2b4193b6241058351991e7d79f6e247 |
|
MD5 | f3d1c29fa84854a8af2317c715c7558b |
|
BLAKE2-256 | 6be71939d43110a0e1a54332969b083e98529f050c84c8d2b2227caead353956 |