Skip to main content

Faktory Client Python (Producer and Consumer/Worker)

Project description

Faktory Client Python (faktory_worker_python)

This repository provides Python Client (Consumer and Producer) for the Faktory background job server.

                   +--------------------+
                   |                    |
                   |     Faktory        |
                   |     Server         |
          +---->>>>|                    +>>>>----+
          |        |                    |        |
          |        |                    |        |
          |        +--------------------+        |
          |                                      |
          |                                      |
          |                                      |
          |                                      |
+-----------------------------------------------------------+     
|         .                Faktory               .          |            
|         .                Client                .          |  
|         .                                      .          |          
|   +-----------------+               +-----------------+   |
|   |                 |               |                 |   |
|   |    Producer     |               |    Consumer     |   |
|   |     pushes      |               |    (Worker)     |   |
|   |      jobs       |               |     fetches     |   |
|   |                 |               |       jobs      |   |
|   |                 |               |                 |   |
|   +-----------------+               +-----------------+   |          
|                                                           |            
+-----------------------------------------------------------+            
  • Server - the Faktory daemon which stores background jobs in queues to be processed by Workers.
  • Client - an entity that communicates with the Faktory server using the FWP. A single client can act as both a consumer and a producer.
  • Consumer (or Worker) - a client that fetches work units (jobs) from the server for execution.
  • Producer - a client that issues new work units to the server.

This library tries to implement the FWP as well as possible. If you notice any inconsistencies, please report them.

Installation

pip install pyfaktory

Usage

Faktory Server

If you have a Faktory server already running, make sure you have the correct url.

# Default url for a Faktory server running locally
faktory_server_url = 'tcp://localhost:7419'

For the installation of faktory, please refer to the official documentation.

After installation, you can run it locally.

$ /usr/bin/faktory
Faktory 1.8.0

You can use a password for the Faktory server via the environment variable FAKTORY_PASSWORD. Note if this value starts with a /, then it is considered a pointer to a file on the filesystem with the password. By default /etc/faktory/password is used.

The format of the Faktory URL is as follows:

tcp://:password@localhost:7419

You can access the Fakotry GUI.

To run Faktory in production:

/usr/bin/faktory -e production

Faktory in production mode requires a password by default since version 0.7.0.

Faktory Client

Import pyfaktory.

from pyfaktory import Client, Consumer, Job, Producer

A single client can act as both a consumer and a producer.

client = Client(faktory_url='tcp://localhost:7419')
client.connect()

# Now you can use the client

# At the end, disconnect the client
client.disconnect()

Client is a context manager, so you can use with statement.

with Client(faktory_url='tcp://localhost:7419') as client:
    # Use client

Use role argument to say how you want to use the client. This argument has three possible values: 'consumer', 'producer' or 'both'.

# A client that acts as both a consumer and a producer.
client = Client(faktory_url='tcp://localhost:7419', role='both')

Producer

Use the client to push jobs.

Push job

with Client(faktory_url='tcp://localhost:7419', role='producer') as client:
    producer = Producer(client=client)
    job_1 = Job(jobtype='adder', args=(5, 4), queue='default')
    producer.push(job_1)

Push bulk jobs

You can push several jobs at once. There is no limit, but 1000 at a time is recommended as a best practice.

with Client(faktory_url='tcp://localhost:7419', role='producer') as client:
    producer = Producer(client=client)
    job_2 = Job(jobtype='adder', args=(50, 41))
    job_3 = Job(jobtype='adder', args=(15, 68))
    res = producer.push_bulk([job_2, job_3])

Consumer (Worker)

Use a worker to pull jobs from Faktory server and execute them.

def adder(x, y):
    logging.info(f"{x} + {y} = {x + y}")

with Client(faktory_url='tcp://localhost:7419', role='consumer') as client:
    consumer = Consumer(client=client, queues=['default'], concurrency=1)
    consumer.register('adder', adder)
    consumer.run()

Use priority to indicates in which queue order the jobs should be fetched first.

# With strict priority, there is a risk of starvation
consumer = Consumer(client=client, queues=['critical', 'default', 'bulk'], priority='strict')
# Each queue has an equal chance of being fetched first
consumer = Consumer(client=client, queues=['critical', 'default', 'bulk'], priority='uniform')
# Weights must be specified
consumer = Consumer(client=client, queues=['critical', 'default', 'bulk'], priority='weighted', weights=[0.6, 0.3, 0.1])

Capture exceptions using Sentry

To capture exceptions using Sentry before failling jobs set sentry_capture_exception argument to True.

consumer = Consumer(client=client, sentry_capture_exception=True)

Info

You can get various information about the server using info Client method.

with Client(faktory_url='tcp://localhost:7419') as client:
    server_info = client.info()
    print(server_info)

Mutate

A wrapper for the Mutate API to script certain repairs or migrations.

⚠️ MUTATE commands can be slow and/or resource intensive. They should not be used as part of your application logic.

from pyfaktory import Client, JobFilter, MutateOperation

client = Client(faktory_url='tcp://localhost:7419')
client.connect()

# Find all scheduled jobs with type `QuickbooksSyncJob` and discard them
op = MutateOperation(
    cmd='discard', 
    target='scheduled', 
    filter=JobFilter(jobtype="QuickbooksSyncJob")
)
client.mutate(op)

# Clear the Retry queue completely
op = MutateOperation(
    cmd='discard', 
    target='retries', 
    filter=JobFilter(regexp="*")
)
client.mutate(op)

# Clear the Retry queue completely
op = MutateOperation(
    cmd='discard', 
    target='retries'
)
client.mutate(op)

# Send a two specific JIDs in the Retry queue to the Dead queue
op = MutateOperation(
    cmd='kill', 
    target='retries', 
    filter=JobFilter(jobtype="QuickbooksSyncJob", jids=["123456789", "abcdefgh"])
)
client.mutate(op)

# Enqueue all retries with a first argument matching "bob"
op = MutateOperation(
    cmd='requeue', 
    target='retries', 
    filter=JobFilter(regexp="*\"args\":[\"bob\"*")
)
client.mutate(op)

client.disconnect()

Queues

Pausing

Queues may be paused (no job can be fetched from them while paused) or unpaused (resume fetching).

# Pause a list of queues
client.queue_pause(queues=['bulk', 'another_queue'])

# Pause all queues
client.queue_pause(all_queues=True)


# Unpause a list of queues
client.queue_unpause(queues=['bulk'])

# Unpause all queues
client.queue_unpause(all_queues=True)

Remove

Queues can be removed which deletes all jobs within them. It does not stop currently executing jobs from those queues.

# Remove a list of queues
client.queue_remove(queues=['bulk'])

# Remove all queues
client.queue_remove(all_queues=True)

Batch (untested)

Refer to documentation.

from pyfaktory import Batch, Client, Job, Producer, TargetJob

client = Client(faktory_url='tcp://localhost:7419')
client.connect()

producer = Producer(client=client)

batch = Batch(
    description="An optional description for the Web UI",
    success=TargetJob(jobtype="MySuccessCallback", args=[123], queue="critical"),
    complete=TargetJob(jobtype="MyCompleteCallback", args=['aa'], queue="critical")
)

# Create a new batch
resp = producer.batch_new(batch)

# Push as many jobs as necessary for the batch
# You may nest batches
# The initial batch data has a TTL of 30 minutes and will expire if batch is not commited
producer.push(Job(jobtype='SomeJob', args=(5, 4), custom={"bid": bid}))
producer.push(Job(jobtype='SomeOtherJob', args=(0, 15), custom={"bid": bid}))

# Commit the batch
producer.batch_commit(bid)

client.disconnect()

Use batch_open to open a created batch.

producer.batch_open(bid)

Use parent_bid for child batch.

child_batch = Batch(
    parent_bid=bid,
    description="An optional description for the Web UI",
    success=TargetJob(jobtype="MySuccessCallback", args=[123], queue="critical"),
    complete=TargetJob(jobtype="MyCompleteCallback", args=['aa'], queue="critical")
)

Custom command

If you want to use a Faktory command that is not yet implemented in this client library, you can send custom commands.

from pyfaktory import Client

my_command = 'INFO\r\n'

with Client(faktory_url='tcp://localhost:7419') as client:
    resp = client._send_and_receive(my_command)
    print(resp)

Outer multiprocessing

You can use outer multiprocessing.

def run_worker():
    try:
        with Client(faktory_url=faktory_server_url, role='consumer') as client:
            import multiprocessing
            custom_context = multiprocessing.get_context("spawn")
            consumer = Consumer(client=client, queues=['t_test'], concurrency=2, context=custom_context)
            consumer.register('task_001', task_001)
            consumer.run()
    except Exception as e:
        print(f"task running error: {str(e)}")

Example

Find examples in ./examples.

  • Start the Faktory server.
/usr/bin/faktory
  • Launch a producer.
python examples/fproducer.py
  • Launch a consumer.
python examples/fconsumer.py

Contribute

Issues

If you encounter a problem, please report it.

In addition to the description of your problem, report the server and client versions.

pyfaktory.__version__
/usr/bin/faktory -v

Reproduce your problem while increasing the level of debugging for both the server and the client, and report the logs.

import logging

logging.basicConfig(format='%(asctime)s %(levelname)-8s %(message)s',
                    level=logging.DEBUG,
                    datefmt='%Y-%m-%d %H:%M:%S')
/usr/bin/faktory -l debug

PRs

Please, feel free to create pull requests.

License

See the LICENSE file for license rights and limitations (MIT).

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyfaktory-0.2.6.tar.gz (17.5 kB view details)

Uploaded Source

Built Distribution

pyfaktory-0.2.6-py3-none-any.whl (17.5 kB view details)

Uploaded Python 3

File details

Details for the file pyfaktory-0.2.6.tar.gz.

File metadata

  • Download URL: pyfaktory-0.2.6.tar.gz
  • Upload date:
  • Size: 17.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.8.10 Linux/5.15.0-72-generic

File hashes

Hashes for pyfaktory-0.2.6.tar.gz
Algorithm Hash digest
SHA256 23d3f9150ae9fb1b62ce136f290e3a191fcafe59d28f91c8199265866b07c5cc
MD5 1f7d9a2b210c70ef02b7a04a2edfea19
BLAKE2b-256 fde9cf8a82263cee0db22f0321fc984e0e5521f87562c4c36e78694c40424f32

See more details on using hashes here.

File details

Details for the file pyfaktory-0.2.6-py3-none-any.whl.

File metadata

  • Download URL: pyfaktory-0.2.6-py3-none-any.whl
  • Upload date:
  • Size: 17.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.8.10 Linux/5.15.0-72-generic

File hashes

Hashes for pyfaktory-0.2.6-py3-none-any.whl
Algorithm Hash digest
SHA256 858dee51a84c2f6dcfc67a059ed244e19e1ba6be5ede7982dcb34c2e277aeff7
MD5 cd6371b7b13183257e699c489f3dd5bb
BLAKE2b-256 e580b9df8ba018703a90500574458210a1cda8495910cf7342eee344d0fcd12b

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page