Microsoft Communication JobRouter Client Library for Python
Project description
Azure Communication JobRouter Package client library for Python
This package contains a Python SDK for Azure Communication Services for JobRouter. Read more about Azure Communication Services here
Source code | Package (Pypi) | Product documentation
Disclaimer
Azure SDK Python packages support for Python 2.7 has ended 01 January 2022. For more information and questions, please refer to https://github.com/Azure/azure-sdk-for-python/issues/20691
Getting started
Prerequisites
You need an Azure subscription and a Communication Service Resource to use this package.
- Python 3.7 or later is required to use this package. For more details, please read our page on Azure SDK for Python version support policy.
- To create a new Communication Service, you can use the Azure Portal, the Azure PowerShell
Install the package
Install the Azure Communication JobRouter client library for Python with pip:
pip install azure-communication-jobrouter
Key concepts
Job
A Job represents the unit of work, which needs to be routed to an available Worker. A real-world example of this may be an incoming call or chat in the context of a call center.
Worker
A Worker represents the supply available to handle a Job. Each worker registers with or more queues to receive jobs. A real-world example of this may be an agent working in a call center.
Queue
A Queue represents an ordered list of jobs waiting to be served by a worker. Workers will register with a queue to receive work from it. A real-world example of this may be a call queue in a call center.
Channel
A Channel represents a grouping of jobs by some type. When a worker registers to receive work, they must also specify for which channels they can handle work, and how much of each can they handle concurrently.
A real-world example of this may be voice calls
or chats
in a call center.
Offer
An Offer is extended by JobRouter to a worker to handle a particular job when it determines a match, this notification is normally delivered via EventGrid. The worker can either accept or decline the offer using th JobRouter API, or it will expire according to the time to live configured on the distribution policy. A real-world example of this may be the ringing of an agent in a call center.
Distribution Policy
A Distribution Policy represents a configuration set that governs how jobs in a queue are distributed to workers registered with that queue. This configuration includes how long an Offer is valid before it expires and the distribution mode, which define the order in which workers are picked when there are multiple available.
Distribution Mode
The 3 types of modes are
- Round Robin: Workers are ordered by
Id
and the next worker after the previous one that got an offer is picked. - Longest Idle: The worker that has not been working on a job for the longest.
- Best Worker: You can specify an expression to compare 2 workers to determine which one to pick.
Labels
You can attach labels to workers, jobs and queues. These are key value pairs that can be of string
, number
or boolean
data types.
A real-world example of this may be the skill level of a particular worker or the team or geographic location.
Label Selectors
Label selectors can be attached to a job in order to target a subset of workers serving the queue. A real-world example of this may be a condition on an incoming call that the agent must have a minimum level of knowledge of a particular product.
Classification policy
A classification policy can be used to dynamically select a queue, determine job priority and attach worker label selectors to a job by leveraging a rules engine.
Exception policy
An exception policy controls the behavior of a Job based on a trigger and executes a desired action. The exception policy is attached to a Queue so it can control the behavior of Jobs in the Queue.
Examples
Client Initialization
To initialize the SMS Client, the connection string can be used to instantiate. Alternatively, you can also use Active Directory authentication using DefaultAzureCredential.
from azure.communication.jobrouter import (
JobRouterClient,
JobRouterAdministrationClient
)
connection_string = "endpoint=ENDPOINT;accessKey=KEY"
router_client = JobRouterClient.from_connection_string(conn_str = connection_string)
router_admin_client = JobRouterAdministrationClient.from_connection_string(conn_str = connection_string)
Distribution Policy
Before we can create a Queue, we need a Distribution Policy.
from azure.communication.jobrouter.models import (
LongestIdleMode,
DistributionPolicy
)
distribution_policy: DistributionPolicy = DistributionPolicy(
offer_expires_after_seconds = 24 * 60 * 60,
mode = LongestIdleMode(
min_concurrent_offers = 1,
max_concurrent_offers = 1
)
)
distribution_policy: DistributionPolicy = router_admin_client.upsert_distribution_policy(
"distribution-policy-1",
distribution_policy
)
Queue
Next, we can create the queue.
from azure.communication.jobrouter.models import (
RouterQueue
)
queue: RouterQueue = RouterQueue(
distribution_policy_id = "distribution-policy-1"
)
queue: RouterQueue = router_admin_client.upsert_queue(
"queue-1",
queue
)
Job
Now, we can submit a job directly to that queue, with a worker selector the requires the worker to have the label Some-Skill
greater than 10.
from azure.communication.jobrouter.models import (
RouterJob,
RouterWorkerSelector,
LabelOperator
)
router_job: RouterJob = RouterJob(
channel_id = "my-channel",
queue_id = "queue-1",
channel_reference = "12345",
priority = 1,
requested_worker_selectors = [
RouterWorkerSelector(key = "Some-Skill", label_operator = LabelOperator.EQUAL, value = 10)
]
)
job: RouterJob = router_client.upsert_job(
"jobId-1",
router_job
)
Worker
Now, we register a worker to receive work from that queue, with a label of Some-Skill
equal to 11.
from azure.communication.jobrouter.models import (
RouterWorker,
RouterChannel
)
router_worker: RouterWorker = RouterWorker(
capacity = 1,
queues = [
"queue-1"
],
labels = {
"Some-Skill": 11
},
channels = [
RouterChannel(channel_id = "my-channel", capacity_cost_per_job = 1)
],
available_for_offers = True
)
worker = router_client.upsert_worker(
"worker-1",
router_worker
)
Offer
We should get a RouterWorkerOfferIssued from our EventGrid subscription.
There are several different Azure services that act as a event handler. For this scenario, we are going to assume Webhooks for event delivery. Learn more about Webhook event delivery
Once events are delivered to the event handler, we can deserialize the JSON payload into a list of events.
# Parse the JSON payload into a list of events
from azure.eventgrid import EventGridEvent
import json
## deserialize payload into a list of typed Events
events = [EventGridEvent.from_json(json.loads(msg)) for msg in payload]
offer_id = ""
for event in events:
if event.event_type == "Microsoft.Communication.RouterWorkerOfferIssued":
offer_id = event.data.offer_id
else:
continue
However, we could also wait a few seconds and then query the worker directly against the JobRouter API to see if an offer was issued to it.
from azure.communication.jobrouter.models import (
RouterWorker,
)
router_worker: RouterWorker = router_client.get_worker(worker_id = "worker-1")
for offer in router_worker.offers:
print(f"Worker {router_worker.id} has an active offer for job {offer.job_id}")
Accept an offer
Once a worker receives an offer, it can take two possible actions: accept or decline. We are going to accept the offer.
from azure.communication.jobrouter.models import (
RouterJobOffer,
AcceptJobOfferResult,
RouterJobStatus
)
# fetching the offer id
job_offer: RouterJobOffer = [offer for offer in router_worker.offers if offer.job_id == "jobId-1"][0]
offer_id = job_offer.offer_id
# accepting the offer sent to `worker-1`
accept_job_offer_result: AcceptJobOfferResult = router_client.accept_job_offer(
worker_id = "worker-1",
offer_id = offer_id
)
print(f"Offer: {job_offer.offer_id} sent to worker: {router_worker.id} has been accepted")
print(f"Job has been assigned to worker: {router_worker.id} with assignment: {accept_job_offer_result.assignment_id}")
# verify job assignment is populated when querying job
updated_job = router_client.get_job(job_id = "jobId-1")
print(f"Job assignment has been successful: {updated_job.job_status == RouterJobStatus.Assigned and accept_job_offer_result.assignment_id in updated_job.assignments}")
Completing a job
Once the worker is done with the job, the worker has to mark the job as completed
.
import datetime
from azure.communication.jobrouter.models import (
CompleteJobOptions
)
complete_job_result = router_client.complete_job(
"jobId-1",
accept_job_offer_result.assignment_id,
CompleteJobOptions(
note = f"Job has been completed by {router_worker.id} at {datetime.datetime.utcnow()}"
)
)
print(f"Job has been successfully completed.")
Closing a job
After a job has been completed, the worker can perform wrap up actions to the job before closing the job and finally releasing its capacity to accept more incoming jobs
from azure.communication.jobrouter.models import (
RouterJob,
RouterJobStatus,
CloseJobOptions,
)
close_job_result = router_client.close_job(
"jobId-1",
accept_job_offer_result.assignment_id,
CloseJobOptions(
note = f"Job has been closed by {router_worker.id} at {datetime.datetime.utcnow()}"
)
)
print(f"Job has been successfully closed.")
update_job: RouterJob = router_client.get_job(job_id = "jobId-1")
print(f"Updated job status: {update_job.job_status == RouterJobStatus.CLOSED}")
import time
from datetime import datetime, timedelta
from azure.communication.jobrouter.models import (
RouterJob,
RouterJobStatus,
CloseJobOptions,
)
close_job_in_future_result = router_client.close_job(
"jobId-1",
accept_job_offer_result.assignment_id,
CloseJobOptions(
note = f"Job has been closed by {router_worker.id} at {datetime.utcnow()}",
close_at = datetime.utcnow() + timedelta(seconds = 2)
)
)
print(f"Job has been marked to close")
time.sleep(secs = 2)
update_job: RouterJob = router_client.get_job(job_id = "jobId-1")
print(f"Updated job status: {update_job.job_status == RouterJobStatus.CLOSED}")
Troubleshooting
Running into issues? This section should contain details as to what to do there.
Next steps
More sample code
Please take a look at the samples directory for detailed examples of how to use this library.
Provide Feedback
If you encounter any bugs or have suggestions, please file an issue in the Issues section of the project
Contributing
This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit cla.microsoft.com.
This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact opencode@microsoft.com with any additional questions or comments.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for azure-communication-jobrouter-1.1.0b1.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 8a339d7ea5eae0b924beedac8073f82f7a550241e6f3ef8608def28533e7fe1f |
|
MD5 | e94d5fb95ba0e7016bec12a1cde8b0ff |
|
BLAKE2b-256 | d5c0e61b56dd84fccad47d021e31c86a2eaea83577765a25e3be5c741b4b8ba6 |
Hashes for azure_communication_jobrouter-1.1.0b1-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | f8093865bcd402002458395cb735c59e707581e31bbf488889a3794b0e336d73 |
|
MD5 | b009840c55a301d5a24b30a60dc50429 |
|
BLAKE2b-256 | e78581d87a30e8187dc732dfbf13fcc0fa3046c721200bb893919f0a11f0a62e |