Small aiohttp server application for TLE storage
Project description
TLEStorageService
Small aiohttp server application for TLE storage
Requirements
aio-nasa-tle-loader >= 1.0.0
aio-space-track-api >= 2.1.1
aiohttp >= 2.1.0
aiopg >= 0.13.0
alembic >= 0.9.2
pyaml >=16.12.2
sqlalchemy >= 1.1.10
ujson >= 1.35
Installing
pip install tle-storage-service
Getting started
Start aiohttp application:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import logging
import os
from aiohttp import web
from tle_storage_service import TleStorageService, load_cfg, LOG_FORMAT
DIR = os.path.abspath(os.path.dirname(__file__))
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
app = TleStorageService(config=load_cfg(path=os.path.join(DIR, 'config', 'server.yaml')))
web.run_app(app, port=8080)
Create periodic script for refresh db
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import asyncio
import logging
import os
from aio_nasa_tle_loader import AsyncNasaTLELoader
from aio_space_track_api import AsyncSpaceTrackApi
from aiopg.sa import create_engine
from tle_storage_service import periodic_task, load_cfg, nasa, space_track, LOG_FORMAT
DIR = os.path.abspath(os.path.dirname(__file__))
CFG = load_cfg(path=os.path.join(DIR, 'config', 'periodic.yaml'))
logger = logging.getLogger(__name__)
@periodic_task(CFG['sync'].get('space-track', 4) * 3600)
async def call_space_track(engine, api_params, satellites, loop):
async with AsyncSpaceTrackApi(loop=loop, **api_params) as api:
await space_track(engine, api, satellites)
@periodic_task(CFG['sync'].get('nasa', 0.5) * 3600)
async def call_nasa(engine, loop):
async with AsyncNasaTLELoader(loop=loop) as loader:
await nasa(engine, loader)
async def run(db_params, api_params, satellites, loop):
async with create_engine(loop=loop, **db_params) as engine:
await asyncio.gather(
call_nasa(engine, loop),
call_space_track(engine, api_params, satellites, loop),
loop=loop
)
def notify_callback(msg):
logger.info('[%s] Receive msg <- "%s"', msg.channel, msg.payload)
if __name__ == '__main__':
logging.basicConfig(format=LOG_FORMAT, level=logging.DEBUG)
loop = asyncio.get_event_loop()
loop.create_task(run(CFG['db'],
CFG['space-track'],
CFG['sync']['satellites'],
loop))
try:
loop.run_forever()
finally:
loop.close()
More code examples into examples directory
Source code
The latest developer version is available in a github repository: https://github.com/nkoshell/tle-storage-service
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
tle-storage-service-1.0.0.tar.gz
(25.8 kB
view hashes)
Close
Hashes for tle-storage-service-1.0.0.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | fa5a408ee99556b683b9678ae065668325594149932d5c31ce7451e1a6a565a0 |
|
MD5 | 4b1a15d09c18012ad11860f6a86907a7 |
|
BLAKE2b-256 | 8b00e54e0da136a67ba94231638f4ebae027f07628ef68e75d9258d84b9f776e |