pyami_asterisk is a library based on python’s AsyncIO with Asterisk AMI
Project description
AsyncIO python library with Asterisk AMI
Pyami_asterisk is a library based on python’s AsyncIO with Asterisk AMI
Install
Install pyami_asterisk
pip install pyami-asterisk
Usage
Asterisk AMI Listen all Events
import asyncio
from pyami_asterisk import AMIClient
def all_events(events):
print(events)
async def hangup_call(events):
"""asynchronous callbacks"""
await asyncio.sleep(3)
print(events)
ami = AMIClient(host='127.0.0.1', port=5038, username='username', secret='password')
ami.register_event(["*"], all_events)
ami.register_event(["Hangup"], hangup_call)
ami.connect()
Asterisk AMI Listen Events: Registry, ContactStatus, PeerStatus
from pyami_asterisk import AMIClient
def register_multiple_events(events):
print(events)
async def callback_peer_status(events):
"""
Response event:
{
'Event': 'PeerStatus',
'Privilege': 'system,all',
'ChannelType': 'PJSIP',
'Peer': 'PJSIP/888',
'PeerStatus': 'Unreachable',
}
"""
if events.get('PeerStatus') == 'Unreachable':
await asyncio.sleep(2)
print(events)
async def ping(events):
"""asynchronous callbacks"""
await asyncio.sleep(3)
print(events)
ami.create_action({"Action": "Ping"}, ping)
ami = AMIClient(host='127.0.0.1', port=5038, username='username', secret='password')
ami.register_event(patterns=["Registry", "ContactStatus"], callbacks=register_multiple_events)
ami.register_event(["PeerStatus"], callback_peer_status)
ami.connect()
Asterisk AMI Actions: CoreSettings
import asyncio
from pyami_asterisk import AMIClient
def core_settings(events):
print(events)
async def core_status(events):
"""asynchronous callbacks"""
await asyncio.sleep(4)
print(events)
ami = AMIClient(host='127.0.0.1', port=5038, username='username', secret='password')
ami.create_action({"Action": "CoreSettings"}, core_settings)
ami.create_action({"Action": "CoreStatus"}, core_status)
ami.connect()
Asterisk AMI Actions: CoreSettings, CoreStatus (repeat 3 seconds)
from pyami_asterisk import AMIClient
def core_settings(events):
print(events)
def core_status(events):
print(events)
print(events['CoreCurrentCalls'])
ami = AMIClient(host='127.0.0.1', port=5038, username='username', secret='password')
ami.create_action({"Action": "CoreSettings"}, core_settings)
ami.create_action({"Action": "CoreStatus"}, core_status, repeat=3)
ami.connect()
Asterisk AMI Action Originate
from pyami_asterisk import AMIClient
def callback_originate(events):
print(events)
ami = AMIClient(host='127.0.0.1', port=5038, username='username', secret='password')
ami.create_action(
{
"Action": "Originate",
"Channel": "pjsip/203",
"Timeout": "20000",
"CallerID": "+37529XXXXXXX <203>",
"Exten": "+37529XXXXXXX",
"Context": "from-internal",
"Async": "true",
"Variable": r"PJSIP_HEADER(add,Call-Info)=\;Answer-After=0", # or multiple Variable ['var=1', 'var=2']
"Priority": "1",
},
callback_originate,
)
ami.connect()
Asterisk AMI Listen Events + Action
from pyami_asterisk import AMIClient
def callback_peer_status(events):
def callback_ping(response_ping):
print("Response Ping", response_ping)
print("PeerStatus", events)
ami.create_action({"Action": "Ping"}, callback_ping)
ami = AMIClient(host='127.0.0.1', port=5038, username='username', secret='password')
ami.register_event(["PeerStatus"], callback_peer_status)
ami.connect()
Create asyncio task
import asyncio
import random
from pyami_asterisk import AMIClient
async def refresh_tokens(timeout=4):
"""Example: Refresh tokens"""
while True:
print(random.randrange(0, 1000))
await asyncio.sleep(timeout)
ami = AMIClient(host='127.0.0.1', port=5038, username='username', secret='password')
ami.create_asyncio_task(tasks=[refresh_tokens(timeout=2)])
ami.connect()
Run / stop async
import asyncio
from pyami_asterisk import AMIClient
async def all_events(event):
print(event)
if event.get('Event') == 'SuccessfulAuth':
# connection close
await ami.connection_close()
async def run_async():
await asyncio.sleep(2)
await ami.connect_ami()
ami = AMIClient(host='127.0.0.1', port=5038, username='username', secret='password')
ami.register_event(["*"], all_events)
asyncio.run(run_async())
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pyami_asterisk-1.5.2.tar.gz.
File metadata
- Download URL: pyami_asterisk-1.5.2.tar.gz
- Upload date:
- Size: 6.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.9.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d9e0b1a22084c9a21675f72c3dd39a422f442cbab4f5cf42ec6718626a0eece9
|
|
| MD5 |
7e75b388b868ca326e9b6661ff30c60a
|
|
| BLAKE2b-256 |
0ff0db7e88ae43a0d317e69d238daaefcd5014f4c2592258fe62b876bc8ea3e5
|
File details
Details for the file pyami_asterisk-1.5.2-py3-none-any.whl.
File metadata
- Download URL: pyami_asterisk-1.5.2-py3-none-any.whl
- Upload date:
- Size: 6.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.9.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b759ad67d36e14c24a155e80b1116c85685f8cba75ed935e5528d165aba4c291
|
|
| MD5 |
76c5099de295603f0fd9947ee5087235
|
|
| BLAKE2b-256 |
ab8a4571b5a0a2e1d3dd87a8e944f77aca9583e2f1429c4be43856e77452e8e6
|