Skip to main content

Zookeeper, Kafka server, and Kafka consumer fixtures for Pytest

Project description

Pytest fixture factories for Zookeeper, Kafka server and Kafka consumer. Read the API docs.

from pathlib import Path
from pytest_kafka import (
    make_zookeeper_process, make_kafka_server, make_kafka_consumer,
    terminate,
)

ROOT = Path(__file__).parent
KAFKA_SCRIPTS = ROOT / 'kafka/bin/'
KAFKA_BIN = str(KAFKA_SCRIPTS / 'kafka-server-start.sh')
ZOOKEEPER_BIN = str(KAFKA_SCRIPTS / 'zookeeper-server-start.sh')

# You can pass a custom teardown function (or parametrise ours). Just don't call it `teardown`
# or Pytest will interpret it as a module-scoped teardown function.
teardown_fn = partial(terminate, signal_fn=Popen.kill)
zookeeper_proc = make_zookeeper_process(ZOOKEEPER_BIN, teardown_fn=teardown_fn)
kafka_server = make_kafka_server(KAFKA_BIN, 'zookeeper_proc', teardown_fn=teardown_fn)
kafka_consumer = make_kafka_consumer(
    'kafka_server', seek_to_beginning=True, kafka_topics=['topic'])

This creates 3 fixtures:

  1. zookeeper_proc - Zookeeper process

  2. kafka_server - Kafka process

  3. kafka_consumer - usable kafka.KafkaConsumer instance

ZOOKEEPER_BIN and KAFKA_BIN are paths to launch scripts in your Kafka distribution. Check this project’s setup.py to see a way of installing Kafka for development.

It is advised to pass seek_to_beginning=True because otherwise some messages may not be captured by the consumer. This requires knowing the topics upfront because without topics there’s no partitions to seek.

Kafka server is known to take a couple of seconds to terminate gracefully. You probably don’t need that, so you can pass partial(terminate, signal_fn=Popen.kill) to make it killed with SIGKILL and waited for afterwards.

It’s possible to create multiple Kafka fixtures forming a cluster by passing the same Zookeeper fixture to them. For an example, check the tests.

Session-scoped fixtures are also available. Consult the test suite.

System requirements

Development

pip install -e .[dev]
./pytest_kafka/install.py  # will install kafka to ./kafka

Acknowledgements

The library has been open-sourced from a codebase belonging to Infectious Media.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pytest_kafka-0.8.1.tar.gz (10.9 kB view details)

Uploaded Source

Built Distribution

pytest_kafka-0.8.1-py3-none-any.whl (10.5 kB view details)

Uploaded Python 3

File details

Details for the file pytest_kafka-0.8.1.tar.gz.

File metadata

  • Download URL: pytest_kafka-0.8.1.tar.gz
  • Upload date:
  • Size: 10.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.12.5

File hashes

Hashes for pytest_kafka-0.8.1.tar.gz
Algorithm Hash digest
SHA256 c4b709b0e71fe1d1f03608ce38997b32ccfdffea89785a03848d477f2c0248a7
MD5 e5f453c47b4e3b3a05e731373d580229
BLAKE2b-256 f392d5849b51c0463825da11f9d1ceb5ed0a2f7e94c107c4a8536a4ae88080c2

See more details on using hashes here.

File details

Details for the file pytest_kafka-0.8.1-py3-none-any.whl.

File metadata

  • Download URL: pytest_kafka-0.8.1-py3-none-any.whl
  • Upload date:
  • Size: 10.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.12.5

File hashes

Hashes for pytest_kafka-0.8.1-py3-none-any.whl
Algorithm Hash digest
SHA256 0624dc32f60323186438b374b38bde133b66af5fa78bc1aa12dd9ae18d21a522
MD5 88ba9c26ab2db5542a564058bfded91f
BLAKE2b-256 f1a11284d2ae3681f8c3fbc3a64e7fec471cd0a0117e69f58f1664ac22955909

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page