Async agi client/server framework (asyncio)
Project description
Aioagi
Async agi client/server framework. The project based on “aiohttp” framework.
Key Features
Supports both client and server side of AGI protocol.
AGI-server has middlewares and pluggable routing.
Getting started
Server
Simple AGI server:
import asyncio
from aiohttp.web import Application, AppRunner, TCPSite, Response
from aioagi import runner
from aioagi.app import AGIApplication
from aioagi.log import agi_server_logger
from aioagi.urldispathcer import AGIView
from aiohttp.web_runner import GracefulExit
async def hello(request):
message = await request.agi.stream_file('hello-world')
await request.agi.verbose('Hello handler: {}.'.format(request.rel_url.query))
agi_server_logger.debug(message)
async def http_hello(request):
return Response(text="Hello, world")
class HelloView(AGIView):
async def sip(self):
message = await self.request.agi.stream_file('hello-world')
await self.request.agi.verbose('HelloView handler: {}.'.format(self.request.rel_url.query))
agi_server_logger.debug(message)
if __name__ == '__main__':
app = AGIApplication()
app.router.add_route('SIP', '/', hello)
runner.run_app(app)
# OR
if __name__ == '__main__':
apps = []
app = AGIApplication()
app.router.add_route('SIP', '/', hello)
http_app = Application()
http_app.router.add_route('GET', '/', http_hello)
loop = asyncio.get_event_loop()
runners = []
sites = []
for _app in [app, http_app]:
app_runner = AppRunner(_app)
loop.run_until_complete(app_runner.setup())
if isinstance(_app, AGIApplication):
sites.append(runner.AGISite(app_runner, port=8081))
else:
sites.append(TCPSite(app_runner, port=8080))
runners.append(app_runner)
for site in sites:
loop.run_until_complete(site.start())
uris = sorted(str(s.name) for runner in runners for s in runner.sites)
print("======== Running on {} ========\n"
"(Press CTRL+C to quit)".format(', '.join(uris)))
try:
loop.run_forever()
except (GracefulExit, KeyboardInterrupt): # pragma: no cover
pass
finally:
for runner in reversed(runners):
loop.run_until_complete(runner.cleanup())
if hasattr(loop, 'shutdown_asyncgens'):
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
Client
To set AGI connection as Asterisk:
import asyncio
import logging.config
from aioagi.log import agi_client_logger
from aioagi.client import AGIClientSession
from aioagi.parser import AGIMessage, AGICode
async def test_request(loop):
headers = {
'agi_channel': 'SIP/100-00000001',
'agi_language': 'ru',
'agi_uniqueid': '1532375920.8',
'agi_version': '14.0.1',
'agi_callerid': '100',
'agi_calleridname': 'test',
'agi_callingpres': '0',
'agi_callingani2': '0',
'agi_callington': '0',
'agi_callingtns': '0',
'agi_dnid': '101',
'agi_rdnis': 'unknown',
'agi_context': 'from-internal',
'agi_extension': '101',
'agi_priority': '1',
'agi_enhanced': '0.0',
'agi_accountcode': '',
'agi_threadid': '139689736754944',
}
async with AGIClientSession(headers=headers, loop=loop) as session:
async with session.sip('agi://localhost:8080/hello/?a=test1&b=var1') as response:
async for message in response:
client_logger.debug(message)
await response.send(AGIMessage(AGICode.OK, '0', {}))
async with session.sip('agi://localhost:8080/hello-view/?a=test2&b=var2') as response:
async for message in response:
client_logger.debug(message)
await response.send(AGIMessage(AGICode.OK, '0', {}))
agi_type: SIP
agi_network: yes
agi_network_script: hello/
agi_request: agi://localhost:8080/hello/
AMI
import asyncio
from aioagi.ami.action import AMIAction
from aioagi.ami.manager import AMIManager
async def callback(manager, message):
print(message)
async def main(app):
manager = AMIManager(
app=app, title='myasterisk',
host='127.0.0.1',
port=5038,
username='username',
secret='secret',
)
manager.register_event('*', callback)
app['manager'] = manager
await manager.connect()
await asyncio.sleep(2)
message = await manager.send_action(AMIAction({
'Action': 'Command',
'Command': 'database show',
}))
print(message)
print(message.body)
async def cleanup(app):
app['manager'].close()
if __name__ == '__main__':
app = {}
_loop = asyncio.get_event_loop()
try:
_loop.run_until_complete(main(app))
except KeyboardInterrupt:
_loop.run_until_complete(cleanup(app))
_loop.close()
Install
pip install aioagi
Thanks
Gael Pasgrimaud - panoramisk
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file aioagi-1.2.0.tar.gz.
File metadata
- Download URL: aioagi-1.2.0.tar.gz
- Upload date:
- Size: 30.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.23.0 setuptools/47.1.1 requests-toolbelt/0.9.1 tqdm/4.48.2 CPython/3.6.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
912c557633e3e1287f3ea78f492b6fe6dd4e461634508c3c00c9d747294c5d05
|
|
| MD5 |
c3100ea27797b67173c4655e31561d2c
|
|
| BLAKE2b-256 |
223f5ee65eca31a604b8422c36abc0254dad78581f96ad789ee47b01a51a331b
|
File details
Details for the file aioagi-1.2.0-py3-none-any.whl.
File metadata
- Download URL: aioagi-1.2.0-py3-none-any.whl
- Upload date:
- Size: 38.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.23.0 setuptools/47.1.1 requests-toolbelt/0.9.1 tqdm/4.48.2 CPython/3.6.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f63446f24109422f7707ca9fe451b0ccdfd837e0651e0511fb198b61757b86a5
|
|
| MD5 |
90de85290d943796897b84dde11920c9
|
|
| BLAKE2b-256 |
2e9657b8fd7e32dcba2e34d17d42c8869f1cfa726554c6c6a6b93805bf7df395
|