Async agi client/server framework (asyncio)
Project description
Aioagi
Async agi client/server framework. The project based on “aiohttp” framework.
Key Features
Supports both client and server side of AGI protocol.
AGI-server has middlewares and pluggable routing.
Getting started
Server
Simple AGI server:
import asyncio
from aiohttp.web import Application, AppRunner, TCPSite, Response
from aioagi import runner
from aioagi.app import AGIApplication
from aioagi.log import agi_server_logger
from aioagi.urldispathcer import AGIView
from aiohttp.web_runner import GracefulExit
async def hello(request):
message = await request.agi.stream_file('hello-world')
await request.agi.verbose('Hello handler: {}.'.format(request.rel_url.query))
agi_server_logger.debug(message)
async def http_hello(request):
return Response(text="Hello, world")
class HelloView(AGIView):
async def sip(self):
message = await self.request.agi.stream_file('hello-world')
await self.request.agi.verbose('HelloView handler: {}.'.format(self.request.rel_url.query))
agi_server_logger.debug(message)
if __name__ == '__main__':
app = AGIApplication()
app.router.add_route('SIP', '/', hello)
runner.run_app(app)
# OR
if __name__ == '__main__':
apps = []
app = AGIApplication()
app.router.add_route('SIP', '/', hello)
http_app = Application()
http_app.router.add_route('GET', '/', http_hello)
loop = asyncio.get_event_loop()
runners = []
sites = []
for _app in [app, http_app]:
app_runner = AppRunner(_app)
loop.run_until_complete(app_runner.setup())
if isinstance(_app, AGIApplication):
sites.append(runner.AGISite(app_runner, port=8081))
else:
sites.append(TCPSite(app_runner, port=8080))
runners.append(app_runner)
for site in sites:
loop.run_until_complete(site.start())
uris = sorted(str(s.name) for runner in runners for s in runner.sites)
print("======== Running on {} ========\n"
"(Press CTRL+C to quit)".format(', '.join(uris)))
try:
loop.run_forever()
except (GracefulExit, KeyboardInterrupt): # pragma: no cover
pass
finally:
for runner in reversed(runners):
loop.run_until_complete(runner.cleanup())
if hasattr(loop, 'shutdown_asyncgens'):
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
Client
To set AGI connection as Asterisk:
import asyncio
import logging.config
from aioagi.log import agi_client_logger
from aioagi.client import AGIClientSession
from aioagi.parser import AGIMessage, AGICode
async def test_request(loop):
headers = {
'agi_channel': 'SIP/100-00000001',
'agi_language': 'ru',
'agi_uniqueid': '1532375920.8',
'agi_version': '14.0.1',
'agi_callerid': '100',
'agi_calleridname': 'test',
'agi_callingpres': '0',
'agi_callingani2': '0',
'agi_callington': '0',
'agi_callingtns': '0',
'agi_dnid': '101',
'agi_rdnis': 'unknown',
'agi_context': 'from-internal',
'agi_extension': '101',
'agi_priority': '1',
'agi_enhanced': '0.0',
'agi_accountcode': '',
'agi_threadid': '139689736754944',
}
async with AGIClientSession(headers=headers, loop=loop) as session:
async with session.sip('agi://localhost:8080/hello/?a=test1&b=var1') as response:
async for message in response:
client_logger.debug(message)
await response.send(AGIMessage(AGICode.OK, '0', {}))
async with session.sip('agi://localhost:8080/hello-view/?a=test2&b=var2') as response:
async for message in response:
client_logger.debug(message)
await response.send(AGIMessage(AGICode.OK, '0', {}))
agi_type: SIP
agi_network: yes
agi_network_script: hello/
agi_request: agi://localhost:8080/hello/
AMI
import asyncio
from aioagi.ami.action import AMIAction
from aioagi.ami.manager import AMIManager
async def callback(manager, message):
print(message)
async def main(app):
manager = AMIManager(
app=app, title='myasterisk',
host='127.0.0.1',
port=5038,
username='username',
secret='secret',
)
manager.register_event('*', callback)
app['manager'] = manager
await manager.connect()
await asyncio.sleep(2)
message = await manager.send_action(AMIAction({
'Action': 'Command',
'Command': 'database show',
}))
print(message)
print(message.body)
async def cleanup(app):
app['manager'].close()
if __name__ == '__main__':
app = {}
_loop = asyncio.get_event_loop()
try:
_loop.run_until_complete(main(app))
except KeyboardInterrupt:
_loop.run_until_complete(cleanup(app))
_loop.close()
Install
pip install aioagi
Thanks
Gael Pasgrimaud - panoramisk
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
aioagi-1.2.0.tar.gz
(30.2 kB
view details)
Built Distribution
aioagi-1.2.0-py3-none-any.whl
(38.5 kB
view details)
File details
Details for the file aioagi-1.2.0.tar.gz
.
File metadata
- Download URL: aioagi-1.2.0.tar.gz
- Upload date:
- Size: 30.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.23.0 setuptools/47.1.1 requests-toolbelt/0.9.1 tqdm/4.48.2 CPython/3.6.9
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 912c557633e3e1287f3ea78f492b6fe6dd4e461634508c3c00c9d747294c5d05 |
|
MD5 | c3100ea27797b67173c4655e31561d2c |
|
BLAKE2b-256 | 223f5ee65eca31a604b8422c36abc0254dad78581f96ad789ee47b01a51a331b |
File details
Details for the file aioagi-1.2.0-py3-none-any.whl
.
File metadata
- Download URL: aioagi-1.2.0-py3-none-any.whl
- Upload date:
- Size: 38.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.23.0 setuptools/47.1.1 requests-toolbelt/0.9.1 tqdm/4.48.2 CPython/3.6.9
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | f63446f24109422f7707ca9fe451b0ccdfd837e0651e0511fb198b61757b86a5 |
|
MD5 | 90de85290d943796897b84dde11920c9 |
|
BLAKE2b-256 | 2e9657b8fd7e32dcba2e34d17d42c8869f1cfa726554c6c6a6b93805bf7df395 |