Custom sources and sinks for Quixstreams.
Project description
Quixplus
This project contains custom data sources for streaming data to QuixStreams using:
- WebSocket: Streams data from WebSocket endpoints.
- CSV: Streams data from CSV files.
- HTTP: Polls data from HTTP APIs at scheduled intervals.
Each source is designed to integrate with QuixStreams for real-time data streaming in Python applications.
Table of Contents
Getting Started
This project requires:
- Python 3.8 or higher
- QuixStreams library
- Access to your data sources (WebSocket endpoints, HTTP APIs, or CSV files)
Environment Setup
Clone this repository:
git clone https://github.com/your-username/your-repo.git
cd your-repo
Install the dependencies:
pip install -r requirements.txt
Set up your environment variables:
To use the application factory method, you need to set up your environment variables.
export BOOTSTRAP_SERVERS=<your-bootstrap-servers>
export SASL_USERNAME=<your-username>
export SASL_PASSWORD=<your-password>
export SASL_MECHANISM=<your-sasl-mechanism>
export SECURITY_PROTOCOL=<your-security-protocol>
Data Sources
WebSocket Source
The WebSocketSource class connects to a WebSocket endpoint, receives data, applies optional transformations, and streams it to QuixStreams.
Key Features:
- Connects to WebSocket URLs.
- Validates and transforms incoming messages.
- Sends data to a specified Kafka topic.
CSV Source
The CSVSource class reads data from a CSV file, transforms each row, and streams it to QuixStreams.
Key Features:
- Reads from CSV files with configurable delimiters.
- Extracts each row as a JSON object, using column headers as keys.
- Supports configurable keys for Kafka messages and value serialization.
HTTP Source
The AIOHTTPSource class polls an HTTP API endpoint at specified intervals, validates the data, and streams it to QuixStreams.
Key Features:
- Asynchronous polling with configurable intervals.
- Flexible authentication (Bearer token, Basic, or custom headers).
- Supports configurable JSON path extraction for Kafka keys and values.
Installation:
Install from PyPI:
pip install quixplus
Add your environment variables to the .env file.
Usage
WebSocket Source Example
To use the WebSocket source, initialize it with the necessary configurations:
from quixstreams import Application
from custom_sources.websocket_source import WebSocketSource
app = Application(broker_address="your_kafka_broker")
ws_source = WebSocketSource(
topic="your_topic",
ws_url="wss://your_websocket_endpoint",
auth_payload={"api_key": "your_key"},
subscribe_payload={"action": "subscribe", "params": "A.*"}
)
sdf = app.dataframe(source=ws_source)
sdf.print()
app.run()
CSV Source Example
To use the CSV source, provide the path to your CSV file and specify the columns for Kafka keys:
from quixstreams import Application
from custom_sources.csv_source import CSVSource
app = Application(broker_address="your_kafka_broker")
csv_source = CSVSource(
topic="your_topic",
csv_path="path/to/your/csv/file.csv",
key_columns=["column1", "column2"]
)
sdf = app.dataframe(source=csv_source)
sdf.print()
app.run()
HTTP Source Example
To use the HTTP source, set the URL and polling interval:
from quixstreams import Application
from custom_sources.http_source import AIOHTTPSource
app = Application(broker_address="your_kafka_broker")
http_source = AIOHTTPSource(
url="https://your-api-endpoint",
poll_interval=10,
auth_type="bearer",
auth_credentials="your_bearer_token",
key_json_path="$.data.id",
value_json_path="$.data"
)
sdf = app.dataframe(source=http_source)
sdf.print()
app.run()
Contributing
We welcome contributions! Please read our CONTRIBUTING.md for guidelines on contributing to this project.
License
This project is licensed under the MIT License.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file quixplus-3.3.3.tar.gz
.
File metadata
- Download URL: quixplus-3.3.3.tar.gz
- Upload date:
- Size: 12.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.4.29
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 86b72a95f5eefc92e04b6daa36c65e78be5fd3db9346cb0cb839f3cc5786bd0e |
|
MD5 | d28645deafece087cc2031489275e032 |
|
BLAKE2b-256 | 0a3fbeee4e6405378a4c32e6531b45300549d9e69a524102d124f5daadb5967b |
File details
Details for the file quixplus-3.3.3-py3-none-any.whl
.
File metadata
- Download URL: quixplus-3.3.3-py3-none-any.whl
- Upload date:
- Size: 11.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.4.29
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | dbcaef27408817c688d02456e6434f26fba1c2f08845f793a4c58f5d60cba42a |
|
MD5 | 5dabc2dbc33d6bc709f0d078a6b84e42 |
|
BLAKE2b-256 | 3978dff48446a22e2ae3f0463acb090803d7272cd50c9823101e342e7f250b8c |