listeners bolts for geniusrise
Project description
Listeners
This is a collection of generic streaming listeners (Spouts).
Table of Contents
Includes:
No. | Name | Description | Output Type | Input Type |
---|---|---|---|---|
1 | Webhook | Cherrypy server that accepts all HTTP API calls | Streaming | HTTP |
2 | Kafka | Kafka client that listens to a topic | Streaming | Kafka |
3 | Websocket | Websocket server that listens to a socket | Streaming | Websocket |
4 | UDP | UDP server that listens to a given port | Streaming | UDP |
5 | QUIC | Aioquic server that listens to a given port | Streaming | QUIC |
6 | HTTP Polling | HTTP client that keeps polling an API | Streaming | HTTP |
7 | RabbitMQ / AMQP | RabbitMQ client that listens to a given queue | Streaming | AMQP |
8 | MQTT | MQTT client that subscribes and listens to a topic | Streaming | MQTT |
9 | Redis Pub-Sub | Redis client that subscribes to a Pub/Sub channel | Streaming | Redis Pub-Sub |
10 | Redis Streams | Redis client that listens to a stream | Streaming | Redis Streams |
11 | AWS SNS | AWS client that listens to SNS notifications | Streaming | AWS SNS |
12 | AWS SQS | AWS client that listens to messages from an SQS queue | Streaming | AWS SQS |
13 | SocketIo | SocketIo client that listens to a namespace | Streaming | SocketIo |
14 | ActiveMQ | ActiveMQ client that listens to a queue | Streaming | ActiveMQ |
15 | Kinesis | Kinesis client that listens to a stream | Streaming | Kinesis |
16 | Grpc | gRPC client that listens to a server | Streaming | gRPC |
17 | ZeroMQ | ZeroMQ client that listens to a topic | Streaming | ZeroMQ |
Usage
To test, first bring up all related services via the supplied docker-compose:
docker compose up -d
docker compose logs -f
These management consoles will be available:
Console | Link |
---|---|
Kafka UI | http://localhost:8088/ |
RabbitMQ UI | http://localhost:15672/ |
Localstack API | http://localhost:4566 |
Postgres can be accessed with:
docker exec -it geniusrise-postgres-1 psql -U postgres
Webhooks
genius Webhook rise \
streaming \
--output_kafka_topic webhook_test \
--output_kafka_cluster_connection_string localhost:9094 \
postgres \
--postgres_host 127.0.0.1 \
--postgres_port 5432 \
--postgres_user postgres \
--postgres_password postgres \
--postgres_database geniusrise \
--postgres_table state \
listen \
--args port=3001
Test:
curl -X POST \
-H "Content-Type: application/json" \
-d '{"lol": "teeeestss"}' \
http://localhost:3001
Kafka
genius Kafka rise \
streaming \
--output_kafka_topic kafka_test \
--output_kafka_cluster_connection_string localhost:9094 \
postgres \
--postgres_host 127.0.0.1 \
--postgres_port 5432 \
--postgres_user postgres \
--postgres_password postgres \
--postgres_database geniusrise \
--postgres_table state \
listen \
--args topic=kafka_test_input group_id=kafka_test bootstrap_servers=localhost:9094
Websockets
genius Websocket rise \
streaming \
--output_kafka_topic websocket_test \
--output_kafka_cluster_connection_string localhost:9094 \
postgres \
--postgres_host 127.0.0.1 \
--postgres_port 5432 \
--postgres_user postgres \
--postgres_password postgres \
--postgres_database geniusrise \
--postgres_table state \
listen \
--args host=localhost port=3002
Test:
cargo install websocat
echo '{"lol": "heheheheheheheh"}' | websocat ws://localhost:3002
UDP
genius Udp rise \
streaming \
--output_kafka_topic udp_test \
--output_kafka_cluster_connection_string localhost:9094 \
postgres \
--postgres_host 127.0.0.1 \
--postgres_port 5432 \
--postgres_user postgres \
--postgres_password postgres \
--postgres_database geniusrise \
--postgres_table state \
listen \
--args host=localhost port=3003
Test:
yay -S netcat
echo -n '{"key": "value"}' | nc -u -w1 localhost 3003
QUIC
openssl genpkey -algorithm RSA -out ~/.ssh/quic_key.pem
openssl req -new -x509 -key ~/.ssh/quic_key.pem -out ~/.ssh/quic_cert.pem -days 365 -subj "/CN=localhost"
genius Quic rise \
streaming \
--output_kafka_topic udp_test \
--output_kafka_cluster_connection_string localhost:9094 \
postgres \
--postgres_host 127.0.0.1 \
--postgres_port 5432 \
--postgres_user postgres \
--postgres_password postgres \
--postgres_database geniusrise \
--postgres_table state \
listen \
--args \
cert_path=/home/ixaxaar/.ssh/quic_cert.pem \
key_path=/home/ixaxaar/.ssh/quic_key.pem \
host=localhost \
port=3004
Test:
go install github.com/spacewander/quick@latest
quick -insecure https://localhost:3004
REST API polling
genius RESTAPIPoll rise \
streaming \
--output_kafka_topic poll_test \
--output_kafka_cluster_connection_string localhost:9094 \
postgres \
--postgres_host 127.0.0.1 \
--postgres_port 5432 \
--postgres_user postgres \
--postgres_password postgres \
--postgres_database geniusrise \
--postgres_table state \
listen \
--args \
url=https://reqres.in/api/users \
method=GET \
interval=6 \
body="" \
headers='{"content-type": "application/json"}' \
params='{"page": 2}'
RabbitMQ / AMQP
genius RabbitMQ rise \
streaming \
--output_kafka_topic rabbitmq_test \
--output_kafka_cluster_connection_string localhost:9094 \
postgres \
--postgres_host 127.0.0.1 \
--postgres_port 5432 \
--postgres_user postgres \
--postgres_password postgres \
--postgres_database geniusrise \
--postgres_table state \
listen \
--args queue_name=geniusrise_test host=localhost username=admin password=admin
Test:
Go to http://localhost:15672/#/queues/%2F/geniusrise_test
MQTT
genius MQTT rise \
streaming \
--output_kafka_topic mqtt_test \
--output_kafka_cluster_connection_string localhost:9094 \
postgres \
--postgres_host 127.0.0.1 \
--postgres_port 5432 \
--postgres_user postgres \
--postgres_password postgres \
--postgres_database geniusrise \
--postgres_table state \
listen \
--args host=localhost port=1883 topic=geniusrise_test
snap install mqtt-explorer # GUI, create a topic, send a message
or
docker exec -it streaming-spouts-mosquitto-1 mosquitto_pub -h 127.0.0.1 -t "geniusrise_test" -m '{"test": "mqtt message"}'
Redis Pub-Sub
genius RedisPubSub rise \
streaming \
--output_kafka_topic redispubsub_test \
--output_kafka_cluster_connection_string localhost:9094 \
postgres \
--postgres_host 127.0.0.1 \
--postgres_port 5432 \
--postgres_user postgres \
--postgres_password postgres \
--postgres_database geniusrise \
--postgres_table state \
listen \
--args channel=geniusrise_test host=localhost port=6380
Test:
redis-cli PUBLISH geniusrise_test '{"test": "redis pubsub message"}'
Redis Streams
genius RedisStream rise \
streaming \
--output_kafka_topic redisstream_test \
--output_kafka_cluster_connection_string localhost:9094 \
postgres \
--postgres_host 127.0.0.1 \
--postgres_port 5432 \
--postgres_user postgres \
--postgres_password postgres \
--postgres_database geniusrise \
--postgres_table state \
listen \
--args stream_key=geniusrise_test host=localhost
Test:
redis-cli XADD geniusrise_test * test "redis stream message"
AWS SNS
genius SNS rise \
streaming \
--output_kafka_topic sns_test \
--output_kafka_cluster_connection_string localhost:9094 \
postgres \
--postgres_host 127.0.0.1 \
--postgres_port 5432 \
--postgres_user postgres \
--postgres_password postgres \
--postgres_database geniusrise \
--postgres_table state \
listen
Test:
aws sns create-topic --name geniusrise_test
aws sns publish --topic-arn arn:aws:sns:ap-south-1:866011655254:geniusrise_test --message '{"test": "sns message"}'
AWS SQS
genius SQS rise \
streaming \
--output_kafka_topic sqs_test \
--output_kafka_cluster_connection_string localhost:9094 \
postgres \
--postgres_host 127.0.0.1 \
--postgres_port 5432 \
--postgres_user postgres \
--postgres_password postgres \
--postgres_database geniusrise \
--postgres_table state \
listen \
--args queue_url=https://sqs.ap-south-1.amazonaws.com/866011655254/geniusrise_test
Test:
aws sqs send-message --queue-url https://sqs.ap-south-1.amazonaws.com/866011655254/geniusrise_test --message-body '{"test": "sqs message"}'
socket.io
genius SocketIo rise \
streaming \
--output_kafka_topic socketio_test \
--output_kafka_cluster_connection_string localhost:9094 \
postgres \
--postgres_host 127.0.0.1 \
--postgres_port 5432 \
--postgres_user postgres \
--postgres_password postgres \
--postgres_database geniusrise \
--postgres_table state \
listen \
--args url=http://localhost:3000 namespace=/chat
Test:
# Use a SocketIo client to emit a message to the specified namespace.
ActiveMQ
genius ActiveMQ rise \
streaming \
--output_kafka_topic activemq_test \
--output_kafka_cluster_connection_string localhost:9094 \
postgres \
--postgres_host 127.0.0.1 \
--postgres_port 5432 \
--postgres_user postgres \
--postgres_password postgres \
--postgres_database geniusrise \
--postgres_table state \
listen \
--args host=localhost port=61613 destination=my_queue
Test:
# Use an ActiveMQ client to send a message to the specified destination.
Kinesis
genius Kinesis rise \
streaming \
--output_kafka_topic kinesis_test \
--output_kafka_cluster_connection_string localhost:9094 \
postgres \
--postgres_host 127.0.0.1 \
--postgres_port 5432 \
--postgres_user postgres \
--postgres_password postgres \
--postgres_database geniusrise \
--postgres_table state \
listen \
--args stream_name=my_stream shard_id=shardId-000000000000
Test:
# Use the AWS CLI or SDK to put a record into the specified Kinesis stream.
Grpc
genius Grpc rise \
streaming \
--output_kafka_topic grpc_test \
--output_kafka_cluster_connection_string localhost:9094 \
postgres \
--postgres_host 127.0.0.1 \
--postgres_port 5432 \
--postgres_user postgres \
--postgres_password postgres \
--postgres_database geniusrise \
--postgres_table state \
listen \
--args server_address=localhost:50051 request_data=my_request syntax=proto3
Test:
# Use a gRPC client to send a message to the specified server address.
ZeroMQ
genius ZeroMQ rise \
streaming \
--output_kafka_topic zmq_test \
--output_kafka_cluster_connection_string localhost:9094 \
postgres \
--postgres_host 127.0.0.1 \
--postgres_port 5432 \
--postgres_user postgres \
--postgres_password postgres \
--postgres_database geniusrise \
--postgres_table state \
listen \
--args endpoint=tcp://localhost:5555 topic=my_topic syntax=json
Test:
# Use a ZeroMQ client to send a message to the specified endpoint and topic.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file geniusrise-listeners-0.1.7.tar.gz
.
File metadata
- Download URL: geniusrise-listeners-0.1.7.tar.gz
- Upload date:
- Size: 19.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.10.12
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 33ace1c86cc4b13e79fec944cff244d25b78a8cd3d8e7da821fda6adb6b6613b |
|
MD5 | 56bb0ce96c712ae7275d661b2e0be12a |
|
BLAKE2b-256 | 48a5adcaf49cea16ca2ce9d9b7e47b031f86163720bf92e01dcb035861e22f9e |
File details
Details for the file geniusrise_listeners-0.1.7-py3-none-any.whl
.
File metadata
- Download URL: geniusrise_listeners-0.1.7-py3-none-any.whl
- Upload date:
- Size: 36.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.10.12
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 208a9cbf1843163e31914e5d49e58ed6139f798003e5c303f54356c4fdbd0dfa |
|
MD5 | a89d3731aeab5292a911bfbb8d2f35dc |
|
BLAKE2b-256 | cb90751217bbea57fdb9596022fbb690c865b0c273ae20b7a3b9cf5f31411458 |