Skip to main content

Quixplus is a Python library for building sources and sinksfor Quix Streams.

Project description

Quixplus

This project contains custom data sources for streaming data to QuixStreams using:

  • WebSocket: Streams data from WebSocket endpoints.
  • CSV: Streams data from CSV files.
  • HTTP: Polls data from HTTP APIs at scheduled intervals.

Each source is designed to integrate with QuixStreams for real-time data streaming in Python applications.

Table of Contents

Getting Started

This project requires:

  • Python 3.8 or higher
  • QuixStreams library
  • Access to your data sources (WebSocket endpoints, HTTP APIs, or CSV files)

Environment Setup

Clone this repository:

git clone https://github.com/your-username/your-repo.git
cd your-repo

Install the dependencies:

pip install -r requirements.txt

Set up your environment variables:

To use the application factory method, you need to set up your environment variables.

export BOOTSTRAP_SERVERS=<your-bootstrap-servers>
export SASL_USERNAME=<your-username>
export SASL_PASSWORD=<your-password>
export SASL_MECHANISM=<your-sasl-mechanism>
export SECURITY_PROTOCOL=<your-security-protocol>

Data Sources

WebSocket Source

The WebSocketSource class connects to a WebSocket endpoint, receives data, applies optional transformations, and streams it to QuixStreams.

Key Features:

  • Connects to WebSocket URLs.
  • Validates and transforms incoming messages.
  • Sends data to a specified Kafka topic.

CSV Source

The CSVSource class reads data from a CSV file, transforms each row, and streams it to QuixStreams.

Key Features:

  • Reads from CSV files with configurable delimiters.
  • Extracts each row as a JSON object, using column headers as keys.
  • Supports configurable keys for Kafka messages and value serialization.

HTTP Source

The AIOHTTPSource class polls an HTTP API endpoint at specified intervals, validates the data, and streams it to QuixStreams.

Key Features:

  • Asynchronous polling with configurable intervals.
  • Flexible authentication (Bearer token, Basic, or custom headers).
  • Supports configurable JSON path extraction for Kafka keys and values.

Installation:

Install from PyPI:

pip install quixplus

Add your environment variables to the .env file.

Usage

WebSocket Source Example

To use the WebSocket source, initialize it with the necessary configurations:

from quixstreams import Application
from custom_sources.websocket_source import WebSocketSource

app = Application(broker_address="your_kafka_broker")
ws_source = WebSocketSource(
    topic="your_topic",
    ws_url="wss://your_websocket_endpoint",
    auth_payload={"api_key": "your_key"},
    subscribe_payload={"action": "subscribe", "params": "A.*"}
)
sdf = app.dataframe(source=ws_source)
sdf.print()
app.run()

CSV Source Example

To use the CSV source, provide the path to your CSV file and specify the columns for Kafka keys:

from quixstreams import Application
from custom_sources.csv_source import CSVSource

app = Application(broker_address="your_kafka_broker")
csv_source = CSVSource(
    topic="your_topic",
    csv_path="path/to/your/csv/file.csv",
    key_columns=["column1", "column2"]
)
sdf = app.dataframe(source=csv_source)
sdf.print()
app.run()

HTTP Source Example

To use the HTTP source, set the URL and polling interval:

from quixstreams import Application
from custom_sources.http_source import AIOHTTPSource

app = Application(broker_address="your_kafka_broker")
http_source = AIOHTTPSource(
    url="https://your-api-endpoint",
    poll_interval=10,
    auth_type="bearer",
    auth_credentials="your_bearer_token",
    key_json_path="$.data.id",
    value_json_path="$.data"
)
sdf = app.dataframe(source=http_source)
sdf.print()
app.run()

Contributing

We welcome contributions! Please read our CONTRIBUTING.md for guidelines on contributing to this project.

License

This project is licensed under the MIT License.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

quixplus-1.0.1.tar.gz (11.9 kB view details)

Uploaded Source

Built Distribution

quixplus-1.0.1-py3-none-any.whl (11.3 kB view details)

Uploaded Python 3

File details

Details for the file quixplus-1.0.1.tar.gz.

File metadata

  • Download URL: quixplus-1.0.1.tar.gz
  • Upload date:
  • Size: 11.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.4.29

File hashes

Hashes for quixplus-1.0.1.tar.gz
Algorithm Hash digest
SHA256 0a82882c557a02cca67ac996a188921aab71f13d15dfc04bf205151b068c7d13
MD5 06a3273bf7897d3b97e58b11a81751a1
BLAKE2b-256 4933d36f3c83b0c6c7cbccaf9901ec21d46d0ee9a3bf17a95d1a371a2d77533b

See more details on using hashes here.

File details

Details for the file quixplus-1.0.1-py3-none-any.whl.

File metadata

File hashes

Hashes for quixplus-1.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 3269a0edfff8ef8e8eca80dd2b37fd7709248dcd82f4526924600070714253de
MD5 5f9a1632561217582677eb6d045d7ffa
BLAKE2b-256 492aa3e9c5b5d8de302e9f68b4b0cd192c789ebe4e365d8e4aac2c5e1358df46

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page