Faktory Client Python (Producer and Consumer/Worker)
Project description
Faktory Client Python (faktory_worker_python)
This repository provides Python Client (Consumer and Producer) for the Faktory background job server.
+--------------------+
| |
| Faktory |
| Server |
+---->>>>| +>>>>----+
| | | |
| | | |
| +--------------------+ |
| |
| |
| |
| |
+-----------------------------------------------------------+
| . Faktory . |
| . Client . |
| . . |
| +-----------------+ +-----------------+ |
| | | | | |
| | Producer | | Consumer | |
| | pushes | | (Worker) | |
| | jobs | | fetches | |
| | | | jobs | |
| | | | | |
| +-----------------+ +-----------------+ |
| |
+-----------------------------------------------------------+
- Server - the Faktory daemon which stores background jobs in queues to be processed by Workers.
- Client - an entity that communicates with the Faktory server using the FWP. A single client can act as both a consumer and a producer.
- Consumer (or Worker) - a client that fetches work units (jobs) from the server for execution.
- Producer - a client that issues new work units to the server.
This library tries to implement the FWP as well as possible. If you notice any inconsistencies, please report them.
Installation
pip install pyfaktory
Usage
Faktory Server
If you have a Faktory server already running, make sure you have the correct url.
# Default url for a Faktory server running locally
faktory_server_url = 'tcp://localhost:7419'
For the installation of faktory, please refer to the official documentation.
After installation, you can run it locally.
$ /usr/bin/faktory
Faktory 1.8.0
You can use a password for the Faktory server via the environment variable FAKTORY_PASSWORD
. Note if this value starts with a /
, then it is considered a pointer to a file on the filesystem with the password. By default /etc/faktory/password
is used.
The format of the Faktory URL is as follows:
tcp://:password@localhost:7419
You can access the Fakotry GUI.
To run Faktory in production:
/usr/bin/faktory -e production
Faktory in production mode requires a password by default since version 0.7.0.
Faktory Client
Import pyfaktory
.
from pyfaktory import Client, Consumer, Job, Producer
A single client can act as both a consumer and a producer.
client = Client(faktory_url='tcp://localhost:7419')
client.connect()
# Now you can use the client
# At the end, disconnect the client
client.disconnect()
Client is a context manager, so you can use with
statement.
with Client(faktory_url='tcp://localhost:7419') as client:
# Use client
Use role
argument to say how you want to use the client. This argument has
three possible values: 'consumer', 'producer' or 'both'.
# A client that acts as both a consumer and a producer.
client = Client(faktory_url='tcp://localhost:7419', role='both')
Producer
Use the client to push jobs.
Push job
with Client(faktory_url='tcp://localhost:7419', role='producer') as client:
producer = Producer(client=client)
job_1 = Job(jobtype='adder', args=(5, 4), queue='default')
producer.push(job_1)
Push bulk jobs
You can push several jobs at once. There is no limit, but 1000 at a time is recommended as a best practice.
with Client(faktory_url='tcp://localhost:7419', role='producer') as client:
producer = Producer(client=client)
job_2 = Job(jobtype='adder', args=(50, 41))
job_3 = Job(jobtype='adder', args=(15, 68))
res = producer.push_bulk([job_2, job_3])
Consumer (Worker)
Use a worker to pull jobs from Faktory server and execute them.
def adder(x, y):
logging.info(f"{x} + {y} = {x + y}")
with Client(faktory_url='tcp://localhost:7419', role='consumer') as client:
consumer = Consumer(client=client, queues=['default'], concurrency=1)
consumer.register('adder', adder)
consumer.run()
Use priority
to indicates in which queue order the jobs should be fetched
first.
# With strict priority, there is a risk of starvation
consumer = Consumer(client=client, queues=['critical', 'default', 'bulk'], priority='strict')
# Each queue has an equal chance of being fetched first
consumer = Consumer(client=client, queues=['critical', 'default', 'bulk'], priority='uniform')
# Weights must be specified
consumer = Consumer(client=client, queues=['critical', 'default', 'bulk'], priority='weighted', weights=[0.6, 0.3, 0.1])
Capture exceptions using Sentry
To capture exceptions using Sentry before failling jobs
set sentry_capture_exception
argument to True
.
consumer = Consumer(client=client, sentry_capture_exception=True)
Info
You can get various information about the server using info
Client method.
with Client(faktory_url='tcp://localhost:7419') as client:
server_info = client.info()
print(server_info)
Mutate
A wrapper for the Mutate API to script certain repairs or migrations.
⚠️ MUTATE commands can be slow and/or resource intensive. They should not be used as part of your application logic.
from pyfaktory import Client, JobFilter, MutateOperation
client = Client(faktory_url='tcp://localhost:7419')
client.connect()
# Find all scheduled jobs with type `QuickbooksSyncJob` and discard them
op = MutateOperation(
cmd='discard',
target='scheduled',
filter=JobFilter(jobtype="QuickbooksSyncJob")
)
client.mutate(op)
# Clear the Retry queue completely
op = MutateOperation(
cmd='discard',
target='retries',
filter=JobFilter(regexp="*")
)
client.mutate(op)
# Clear the Retry queue completely
op = MutateOperation(
cmd='discard',
target='retries'
)
client.mutate(op)
# Send a two specific JIDs in the Retry queue to the Dead queue
op = MutateOperation(
cmd='kill',
target='retries',
filter=JobFilter(jobtype="QuickbooksSyncJob", jids=["123456789", "abcdefgh"])
)
client.mutate(op)
# Enqueue all retries with a first argument matching "bob"
op = MutateOperation(
cmd='requeue',
target='retries',
filter=JobFilter(regexp="*\"args\":[\"bob\"*")
)
client.mutate(op)
client.disconnect()
Queues
Pausing
Queues may be paused (no job can be fetched from them while paused) or unpaused (resume fetching).
# Pause a list of queues
client.queue_pause(queues=['bulk', 'another_queue'])
# Pause all queues
client.queue_pause(all_queues=True)
# Unpause a list of queues
client.queue_unpause(queues=['bulk'])
# Unpause all queues
client.queue_unpause(all_queues=True)
Remove
Queues can be removed which deletes all jobs within them. It does not stop currently executing jobs from those queues.
# Remove a list of queues
client.queue_remove(queues=['bulk'])
# Remove all queues
client.queue_remove(all_queues=True)
Batch (untested)
Refer to documentation.
from pyfaktory import Batch, Client, Job, Producer, TargetJob
client = Client(faktory_url='tcp://localhost:7419')
client.connect()
producer = Producer(client=client)
batch = Batch(
description="An optional description for the Web UI",
success=TargetJob(jobtype="MySuccessCallback", args=[123], queue="critical"),
complete=TargetJob(jobtype="MyCompleteCallback", args=['aa'], queue="critical")
)
# Create a new batch
resp = producer.batch_new(batch)
# Push as many jobs as necessary for the batch
# You may nest batches
# The initial batch data has a TTL of 30 minutes and will expire if batch is not commited
producer.push(Job(jobtype='SomeJob', args=(5, 4), custom={"bid": bid}))
producer.push(Job(jobtype='SomeOtherJob', args=(0, 15), custom={"bid": bid}))
# Commit the batch
producer.batch_commit(bid)
client.disconnect()
Use batch_open
to open a created batch.
producer.batch_open(bid)
Use parent_bid
for child batch.
child_batch = Batch(
parent_bid=bid,
description="An optional description for the Web UI",
success=TargetJob(jobtype="MySuccessCallback", args=[123], queue="critical"),
complete=TargetJob(jobtype="MyCompleteCallback", args=['aa'], queue="critical")
)
Custom command
If you want to use a Faktory command that is not yet implemented in this client library, you can send custom commands.
from pyfaktory import Client
my_command = 'INFO\r\n'
with Client(faktory_url='tcp://localhost:7419') as client:
resp = client._send_and_receive(my_command)
print(resp)
Outer multiprocessing
You can use outer multiprocessing.
def run_worker():
try:
with Client(faktory_url=faktory_server_url, role='consumer') as client:
import multiprocessing
custom_context = multiprocessing.get_context("spawn")
consumer = Consumer(client=client, queues=['t_test'], concurrency=2, context=custom_context)
consumer.register('task_001', task_001)
consumer.run()
except Exception as e:
print(f"task running error: {str(e)}")
Example
Find examples in ./examples
.
- Start the Faktory server.
/usr/bin/faktory
- Launch a producer.
python examples/fproducer.py
- Launch a consumer.
python examples/fconsumer.py
- Look at what is happening in the logs and in the Faktory Web UI.
Contribute
Issues
If you encounter a problem, please report it.
In addition to the description of your problem, report the server and client versions.
pyfaktory.__version__
/usr/bin/faktory -v
Reproduce your problem while increasing the level of debugging for both the server and the client, and report the logs.
import logging
logging.basicConfig(format='%(asctime)s %(levelname)-8s %(message)s',
level=logging.DEBUG,
datefmt='%Y-%m-%d %H:%M:%S')
/usr/bin/faktory -l debug
PRs
Please, feel free to create pull requests.
License
See the LICENSE file for license rights and limitations (MIT).
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file pyfaktory-0.2.6.tar.gz
.
File metadata
- Download URL: pyfaktory-0.2.6.tar.gz
- Upload date:
- Size: 17.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.3 CPython/3.8.10 Linux/5.15.0-72-generic
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 23d3f9150ae9fb1b62ce136f290e3a191fcafe59d28f91c8199265866b07c5cc |
|
MD5 | 1f7d9a2b210c70ef02b7a04a2edfea19 |
|
BLAKE2b-256 | fde9cf8a82263cee0db22f0321fc984e0e5521f87562c4c36e78694c40424f32 |
File details
Details for the file pyfaktory-0.2.6-py3-none-any.whl
.
File metadata
- Download URL: pyfaktory-0.2.6-py3-none-any.whl
- Upload date:
- Size: 17.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.3 CPython/3.8.10 Linux/5.15.0-72-generic
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 858dee51a84c2f6dcfc67a059ed244e19e1ba6be5ede7982dcb34c2e277aeff7 |
|
MD5 | cd6371b7b13183257e699c489f3dd5bb |
|
BLAKE2b-256 | e580b9df8ba018703a90500574458210a1cda8495910cf7342eee344d0fcd12b |