Skip to main content

Provides an interface to consume and publish to data pipeline topics.

Project description

# Data Pipeline Clientlib


What is it?
-----------
Data Pipeline Clientlib provides an interface to tail and publish to data pipeline topics.

[Read More](https://engineeringblog.yelp.com/2016/07/billions-of-messages-a-day-yelps-real-time-data-pipeline.html)


How to download
---------------
```
git clone git@github.com:Yelp/data_pipeline.git
```


Tests
-----
Running unit tests
```
make -f Makefile-opensource test
```


Configuration
-------------
Include the `data_pipeline` namespace in your `module_env_config` of `config.yaml`
and configure following values for `kafka_ip`, `zk_ip` and `schematizer_ip`

```
module_env_config:
...
- namespace: data_pipeline
config:
kafka_broker_list:
- <kafka_ip>:9092
kafka_zookeeper: <zk_ip>:2181
schematizer_host_and_port: <schematizer_ip>:8888
...
```


Usage
-----
Registering a simple schema with the Schematizer service.
```
from data_pipeline.schematizer_clientlib.schematizer import get_schematizer
test_avro_schema_json = {
"type": "record",
"namespace": "test_namespace",
"source": "test_source",
"name": "test_name",
"doc": "test_doc",
"fields": [
{"type": "string", "doc": "test_doc1", "name": "key1"},
{"type": "string", "doc": "test_doc2", "name": "key2"}
]
}
schema_info = get_schematizer().register_schema_from_schema_json(
namespace="test_namespace",
source="test_source",
schema_json=test_avro_schema_json,
source_owner_email="test@test.com",
contains_pii=False
)
```

Creating a simple Data Pipeline Message from payload data.
```
from data_pipeline.message import Message
message = Message(
schema_id = schema_info.schema_id,
payload_data = {
'key1': 'value1',
'key2': 'value2'
}
)
```

Starting a Producer and publishing messages with it::
```
from data_pipeline.producer import Producer
with Producer() as producer:
producer.publish(message)
```

Starting a Consumer with name `my_consumer` that listens for
messages in all topics within the `test_namespace` and `test_source`.
In this example, the consumer consumes a single message, processes it, and
commits the offset.
```
from data_pipeline.consumer import Consumer
from data_pipeline.consumer_source import TopicInSource
consumer_source = TopicInSource("test_namespace", "test_source")
with Consumer(
consumer_name='my_consumer',
team_name='bam',
expected_frequency_seconds=12345,
consumer_source=consumer_source
) as consumer:
while True:
message = consumer.get_message()
if message is not None:
... do stuff with message ...
consumer.commit_message(message)
```


Disclaimer
-------
We're still in the process of setting up this package as a stand-alone. There may be additional work required to run Producers/Consumers and integrate with other applications.


License
-------
Data Pipeline Clientlib is licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0


Contributing
------------
Everyone is encouraged to contribute to Data Pipeline Clientlib by forking the Github repository and making a pull request or opening an issue.



Documentation
-------------

The full documentation is at TODO (DATAPIPE-2031|abrar): Link to public servicedocs.



History
-------

0.1.4 (2015-08-12)
++++++++++++++++++

* Defined consumer/producer registration API

0.1.3 (2015-08-10)
++++++++++++++++++

* Added keys kwargs to data pipeline messages

0.1.0 (2015-03-01)
++++++++++++++++++

* First release.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

data_pipeline-0.9.5.tar.gz (131.3 kB view details)

Uploaded Source

Built Distribution

data_pipeline-0.9.5-py2.py3-none-any.whl (214.8 kB view details)

Uploaded Python 2 Python 3

File details

Details for the file data_pipeline-0.9.5.tar.gz.

File metadata

File hashes

Hashes for data_pipeline-0.9.5.tar.gz
Algorithm Hash digest
SHA256 63778205679dc92726dc5daf9fbec95d0f7d2f0b16537276baae5ec6e90ad156
MD5 d56a12f58e788c45f921cf04500c9ad7
BLAKE2b-256 4bde82046042a57edc9e549d26223b5f9432c63cbb2377f2c25e3185441c7538

See more details on using hashes here.

File details

Details for the file data_pipeline-0.9.5-py2.py3-none-any.whl.

File metadata

File hashes

Hashes for data_pipeline-0.9.5-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 053de6a9ef21c2d656c48e6471ea3f7549181343b5d463c2f6d288ad11ac2c7a
MD5 043c9b00db6edcce76440396f242a639
BLAKE2b-256 f6a35eadc752216a6b4383b98e406f53284422c49665f8ab52fba82521e9aa79

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page