Provides an interface to consume and publish to data pipeline topics.
Project description
# Data Pipeline Clientlib
What is it?
-----------
Data Pipeline Clientlib provides an interface to tail and publish to data pipeline topics.
[Read More](https://engineeringblog.yelp.com/2016/07/billions-of-messages-a-day-yelps-real-time-data-pipeline.html)
How to download
---------------
```
git clone git@github.com:Yelp/data_pipeline.git
```
Tests
-----
Running unit tests
```
make -f Makefile-opensource test
```
Configuration
-------------
Include the `data_pipeline` namespace in your `module_env_config` of `config.yaml`
and configure following values for `kafka_ip`, `zk_ip` and `schematizer_ip`
```
module_env_config:
...
- namespace: data_pipeline
config:
kafka_broker_list:
- <kafka_ip>:9092
kafka_zookeeper: <zk_ip>:2181
schematizer_host_and_port: <schematizer_ip>:8888
...
```
Usage
-----
Registering a simple schema with the Schematizer service.
```
from data_pipeline.schematizer_clientlib.schematizer import get_schematizer
test_avro_schema_json = {
"type": "record",
"namespace": "test_namespace",
"source": "test_source",
"name": "test_name",
"doc": "test_doc",
"fields": [
{"type": "string", "doc": "test_doc1", "name": "key1"},
{"type": "string", "doc": "test_doc2", "name": "key2"}
]
}
schema_info = get_schematizer().register_schema_from_schema_json(
namespace="test_namespace",
source="test_source",
schema_json=test_avro_schema_json,
source_owner_email="test@test.com",
contains_pii=False
)
```
Creating a simple Data Pipeline Message from payload data.
```
from data_pipeline.message import Message
message = Message(
schema_id = schema_info.schema_id,
payload_data = {
'key1': 'value1',
'key2': 'value2'
}
)
```
Starting a Producer and publishing messages with it::
```
from data_pipeline.producer import Producer
with Producer() as producer:
producer.publish(message)
```
Starting a Consumer with name `my_consumer` that listens for
messages in all topics within the `test_namespace` and `test_source`.
In this example, the consumer consumes a single message, processes it, and
commits the offset.
```
from data_pipeline.consumer import Consumer
from data_pipeline.consumer_source import TopicInSource
consumer_source = TopicInSource("test_namespace", "test_source")
with Consumer(
consumer_name='my_consumer',
team_name='bam',
expected_frequency_seconds=12345,
consumer_source=consumer_source
) as consumer:
while True:
message = consumer.get_message()
if message is not None:
... do stuff with message ...
consumer.commit_message(message)
```
Disclaimer
-------
We're still in the process of setting up this package as a stand-alone. There may be additional work required to run Producers/Consumers and integrate with other applications.
License
-------
Data Pipeline Clientlib is licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0
Contributing
------------
Everyone is encouraged to contribute to Data Pipeline Clientlib by forking the Github repository and making a pull request or opening an issue.
Documentation
-------------
The full documentation is at TODO (DATAPIPE-2031|abrar): Link to public servicedocs.
History
-------
0.1.4 (2015-08-12)
++++++++++++++++++
* Defined consumer/producer registration API
0.1.3 (2015-08-10)
++++++++++++++++++
* Added keys kwargs to data pipeline messages
0.1.0 (2015-03-01)
++++++++++++++++++
* First release.
What is it?
-----------
Data Pipeline Clientlib provides an interface to tail and publish to data pipeline topics.
[Read More](https://engineeringblog.yelp.com/2016/07/billions-of-messages-a-day-yelps-real-time-data-pipeline.html)
How to download
---------------
```
git clone git@github.com:Yelp/data_pipeline.git
```
Tests
-----
Running unit tests
```
make -f Makefile-opensource test
```
Configuration
-------------
Include the `data_pipeline` namespace in your `module_env_config` of `config.yaml`
and configure following values for `kafka_ip`, `zk_ip` and `schematizer_ip`
```
module_env_config:
...
- namespace: data_pipeline
config:
kafka_broker_list:
- <kafka_ip>:9092
kafka_zookeeper: <zk_ip>:2181
schematizer_host_and_port: <schematizer_ip>:8888
...
```
Usage
-----
Registering a simple schema with the Schematizer service.
```
from data_pipeline.schematizer_clientlib.schematizer import get_schematizer
test_avro_schema_json = {
"type": "record",
"namespace": "test_namespace",
"source": "test_source",
"name": "test_name",
"doc": "test_doc",
"fields": [
{"type": "string", "doc": "test_doc1", "name": "key1"},
{"type": "string", "doc": "test_doc2", "name": "key2"}
]
}
schema_info = get_schematizer().register_schema_from_schema_json(
namespace="test_namespace",
source="test_source",
schema_json=test_avro_schema_json,
source_owner_email="test@test.com",
contains_pii=False
)
```
Creating a simple Data Pipeline Message from payload data.
```
from data_pipeline.message import Message
message = Message(
schema_id = schema_info.schema_id,
payload_data = {
'key1': 'value1',
'key2': 'value2'
}
)
```
Starting a Producer and publishing messages with it::
```
from data_pipeline.producer import Producer
with Producer() as producer:
producer.publish(message)
```
Starting a Consumer with name `my_consumer` that listens for
messages in all topics within the `test_namespace` and `test_source`.
In this example, the consumer consumes a single message, processes it, and
commits the offset.
```
from data_pipeline.consumer import Consumer
from data_pipeline.consumer_source import TopicInSource
consumer_source = TopicInSource("test_namespace", "test_source")
with Consumer(
consumer_name='my_consumer',
team_name='bam',
expected_frequency_seconds=12345,
consumer_source=consumer_source
) as consumer:
while True:
message = consumer.get_message()
if message is not None:
... do stuff with message ...
consumer.commit_message(message)
```
Disclaimer
-------
We're still in the process of setting up this package as a stand-alone. There may be additional work required to run Producers/Consumers and integrate with other applications.
License
-------
Data Pipeline Clientlib is licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0
Contributing
------------
Everyone is encouraged to contribute to Data Pipeline Clientlib by forking the Github repository and making a pull request or opening an issue.
Documentation
-------------
The full documentation is at TODO (DATAPIPE-2031|abrar): Link to public servicedocs.
History
-------
0.1.4 (2015-08-12)
++++++++++++++++++
* Defined consumer/producer registration API
0.1.3 (2015-08-10)
++++++++++++++++++
* Added keys kwargs to data pipeline messages
0.1.0 (2015-03-01)
++++++++++++++++++
* First release.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
No source distribution files available for this release.See tutorial on generating distribution archives.
Built Distribution
File details
Details for the file data_pipeline-0.9.12-py2.py3-none-any.whl
.
File metadata
- Download URL: data_pipeline-0.9.12-py2.py3-none-any.whl
- Upload date:
- Size: 222.4 kB
- Tags: Python 2, Python 3
- Uploaded using Trusted Publishing? No
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 5fb536d573887689eb98cdbca83d1d741e63a38a20155bdcb855824749497afb |
|
MD5 | e10ebdaf19715de98a06db7d44fe1555 |
|
BLAKE2b-256 | 1c29c6611efd53e50f69c1064fc0a75da79f1707a1b6a170e29deb0764189ba4 |