Library for operating messaging systems such as Apache Kafka and MQTT with the same API.
Project description
About: SINETStream API
各種メッセージングサーバ(ブローカー)に対して同一のAPIでアクセスすることを可能にするライブラリ。
この文書では SINETStream の Python API の利用方法について記す。
メッセージングシステムに対する操作
概要
メッセージングシステムのブローカーに対して以下の操作を行う機能を提供する。
- ブローカーへの接続
- ブローカーからの切断
- ブローカーへのメッセージ送信
- ブローカーからのメッセージ受信
SINETStream API では以下のメッセージングシステムをサポートする。
- Apache Kafka 2.2.1
- MQTT v3.1, v3.1.1
- Eclipse Mosquitto v1.6.2
将来、上記のメッセージングシステム以外をサポートする場合に備えて、 各メッセージングシステムを処理するモジュールはプラグインとしてSINETStream API本体とは分離できるように実装を行う。
SINETStream APIでは以下の言語をサポートする。
- Java 8
- Python 3.7
以下のプラットフォームでの動作確認を行う。
- CentOS 7.6
- Windows 10
設定ファイル
概要
SINETStream API ではメッセージングシステムに接続するためのパラメータをAPIに 直接指定しなくて済むように設定ファイルを用意する。設定ファイルには各メッセー ジングシステムに接続するためのパラメータを記述する。APIでは設定ファイルの どのパラメータセットを用いるかを指し示すラベル(サービス名)のみを指定する。 設定ファイルのフォーマットは YAML とする。
設定ファイルの例を以下に示す。
service-1:
type: kafka
brokers:
- kafka-1:9092
- kafka-2:9092
- kafka-3:9092
- kafka-4:9092
service-2:
type: mqtt
brokers: 192.168.2.105:1883
username_pw_set:
username: user01
password: pass01
設定ファイルのなかで一つのサービスを記述するブロックは以下のようになる。
{サービス名}:
type: {メッセージングシステムのタイプ}
brokers:
- {ホスト名1}:{ポート番号1}
- {ホスト名2}:{ポート番号2}
value_type: {メッセージの種類}
{その他のパラメータ1}: {設定値1}
{その他のパラメータ2}: {設定値2}
type
にはメッセージングシステムのタイプを指定する。
SINETStream API v0.9 ではtype
に指定可能な値は kafka
と mqtt
である。
brokers
にはメッセージングシステムのブローカーのアドレスを指定する。
複数のサーバを指定する場合は、リストまたは ,
で連結した文字列を指定する。
ブローカーのポート番号は、ホスト名の後に :
でつなげて指定する。
ポート番号の指定を省略した場合は、各メッセージングシステムのデフォルトの
ポート番号を使用する。 kafkaのデフォルトのポート番号は9092になっている。
value_type
にはメッセージの値の種類を指定する。
指定できるのは byte_array
・text
・image
である。指定しなかった場合はメッセージの値はバイト列とみなされる。
メッセージの値の扱いに関しての優先度は以下のようになる。
MessageReader(value_deserializer=)
指定 /MessageWriter(value_serializer=)
指定MessageReader(value_type=)
指定 /MessageWriter(value_type=)
指定- 設定ファイルの
value_type
指定 - デフォルトのバイト列
制限事項: v0.9 では image
をサポートしない
その他のパラメータを指定する箇所は、メッセージングシステム毎に設定できる パラメータが異なる。 以下のようなパラメータが指定できる。詳細については後述する。
- 通信の暗号化に関するパラメータ
- SSL/TLSのための証明書、秘密鍵ファイルのパス
- CA証明書のパス
- ブローカーに接続するための認証情報に関するパラメータ
- ユーザ名
- パスワード
- 通信プロトコルに関するパラメータ
- Kafka APIのバージョン
- MQTTのプロトコルバージョン(3.1, 3.1.1)
- MQTTのトランスポート層(TCP, WebSocket)
配置場所
設定ファイルは、以下の順で検索して最初に見つかったファイルのみを読み込む。
- 環境変数
SINETSTREAM_CONFIG_URL
に指定されたアドレスから YAML ファイルを取得する
- カレントディレクトリの
.sinetstream_config.yml
$HOME/.config/sinetstream/config.yml
- Windows 10 では
~/.config
はC:\Users\{userXX}\.config
となる
- Windows 10 では
その他のパラメータについて
設定ファイルにその他のパラメータとして指定するものについては基本的に抽象化は行わず、 下位層のクラスに指定するパラメータを透過的に指定できるものとする。 ただし、SSL/TLS に関するパラメータは共通部分が多いので統一した指定方法も用意する。
Apache Kafka
基本的に
kafka-pythonの
KafkaConsumer,
KafkaProducer
のコンストラクタ引数をパラメータとして指定できる。
KafkaConsumer
のみ、またはKafkaProducer
のみで意味を持つパラメータについては、
それぞれ MessageReader
, MessageWriter
の対応するクラスのみに影響を与える。
- group_id
- key_deserializer
- key_serializer
- value_deserializer
- value_serializer
- fetch_min_bytes
- fetch_max_wait_ms
- fetch_max_bytes
- max_partition_fetch_bytes
- request_timeout_ms
- max_in_flight_requests_per_connection
- auto_offset_reset
- enable_auto_commit
- auto_commit_interval_ms
- check_crcs
- metadata_max_age_ms
- partition_assignment_strategy
- max_poll_records
- max_poll_interval_ms
- session_timeout_ms
- heartbeat_interval_ms
- receive_buffer_bytes
- send_buffer_bytes
- socket_options
- consumer_timeout_ms
- security_protocol
- ssl_context
- ssl_check_hostname
- ssl_cafile
- ssl_certfile
- ssl_keyfile
- ssl_password
- ssl_crlfile
- ssl_ciphers
- api_version
- api_version_auto_timeout_ms
- connections_max_idle_ms
- metric_reporters
- metrics_num_samples
- metrics_sample_window_ms
- selector
- exclude_internal_topics
- sasl_mechanism
- sasl_plain_username
- sasl_plain_password
- sasl_kerberos_service_name
- sasl_kerberos_domain_name
- sasl_oauth_token_provider
- acks
- compression_type
- retries
- batch_size
- linger_ms
- buffer_memory
- connections_max_idle_ms
- max_block_ms
- max_request_size
- retry_backoff_ms
- reconnect_backoff_ms
- reconnect_backoff_max_ms
- max_in_flight_requests_per_connection
MessageReader
, MessageWriter
で指定されている型と異なる値が設定されている場合は、
設定ファイルを読み込んだときに Warning
を出力してそのパラメータに関する設定を無視する(エラーとせずに処理を継続する)。
Warningの出力先は、ログ設定に従う。
制限事項: v0.9 では Warning
を出力せずそのまま設定するためエラーになる
MQTT(Eclipse Paho)
基本的に
paho.mqtt.client.Client
のコンストラクタと設定関数(XXX_set
)などの引数に指定できるパラメータを指定できる。
- clean_session
- userdata
- qos
- retain
- protocol
- MQTTv31, MQTTv311
- transport
- websockets, tcp
- max_inflight_messages_set
- max_queued_messages_set
- message_retry_set
- ws_set_options
- path
- headers
- tls_set
- ca_certs
- certfile
- keyfile
- cert_reqs
- tls_version
- ciphers
- tls_set_context
- tls_insecure_set
- enable_logger
- username_pw_set
- username
- password
- will_set
- topic
- payload
- qos
- retain
- reconnect_delay_set
- min_delay
- max_delay
- connect
- keepalive
- bind_address
username_pw_set()
などのコンストラクタとは別の関数で設定するパラメータについては、
関数名がキー、その関数で設定するパラメータ名と値の組からなる mapping が値となるように設定ファイルに記述する。
制限事項: v0.9 ではMQTTのパラメータ設定は未実装
例:
service-2:
type: mqtt
brokers: 192.168.2.105:1883
username_pw_set:
username: user01
password: pass01
paho.mqtt.client.Client
で指定されている型と異なる値が設定されている場合は、
設定ファイルを読み込んだときに Warning
を出力してそのパラメータに関する設定を無視する(エラーとせずに処理を継続する)。
Warningの出力先は、ログ設定に従う。
MQTTv311
などの指定は、指定された文字列をSINETStream APIの MQTTプラグインが適切に処理して対応する定数に変換する。
SSL/TLSに関するパラメータ
各メッセージングシステムのパラメータによらず、共通のパラメータ名でSSL/TLSに関する設定値を指定することができる。
- ca_certs
str
: CA証明書ファイルのパス
- certfile
str
: クライアント証明書のパス
- keyfile
str
: クライアントの秘密鍵のパス
- ciphers
str
: SSL/TLS接続に利用可能な暗号を指定する文字列
- check_hostname
bool
: 証明書がブローカーのホスト名と一致することをsslハンドシェイクで検証するかどうかを設定するためのフラグ
共通のパラメータ名で指定する場合は、パラメータと値の組からなる mapping を、キー tls
の値として設定ファイルに記述する。
例:
service-3:
type: kafka
brokers:
- kafka-1:9093
- kafka-2:9093
tls:
ca_certs: ~/.config/sinetstream/ca.pem
certfile: ~/.config/sinetstream/client.pem
keyfile: ~/.config/sinetstream/client.key
共通のパラメータ名と各メッセージングシステム固有のパラメータ名との対応を以下の表に示す。
SSL/TLS | 型 | Kafka | MQTT |
---|---|---|---|
ca_certs | str(Path) | ssl_cafile | tls_set:ca_certs |
certfile | str(Path) | ssl_certfile | tls_set:certfile |
keyfile | str(Path) | ssl_keyfile | tls_set:keyfile |
ciphers | str | ssl_ciphers | tls_set:ciphers |
check_hostname | bool | ssl_check_hostname | tls_insecure |
1つのサービスに対して共通のパラメータ名とメッセージングシステム固有のパラメータ名の両方が指定された場合は、 メッセージングシステム固有のパラメータの設定値の方が優先される。
SSL/TLSの共通パラメータが1つでも指定された場合、
SSL/TLS による接続を有効にするメッセージングシステム固有のパラメータを暗黙的に設定する。
このようなパラメータとして、Kakfaの security_protocol
がある。
SSL/TLSの共通パラメータが指定された場合はsecurity_protocol
の値をSSL
に設定する。
SSL/TLSのパラメータを指定せずにSSL/TLSによる接続を有効にしたい場合はtls: true
を指定する。
この場合はメッセージングシステム固有のパラメータが暗黙的に設定される。
例:
service-3:
type: kafka
brokers:
- kafka-1:9093
- kafka-2:9093
tls: true
コンストラクタのデフォルト値
SINETStream APIの MessageReader, MessageWriterのコンストラクタ引数のデフォルト値を設定ファイルで指定することができる。
制限事項: v0.9 ではコンストラクタ引数のデフォルト値を設定ファイルで指定する機能は未実装
指定できるパラメータを以下に示す。
- consistency
- client_id
- value_type
- value_deserializer
- value_serializer
- topic
MessageReader, MessageWriterのどちらかにしか存在しないパラメータは、対応するクラスのみに影響を与える。
consistency
と MQTTのqos
などのように対応関係のあるパラメータの両方が設定ファイルに記述されていた場合は、
メッセージングシステム固有のパラメータの方が優先される。
例えばconsistency
とqos
の両方が記述されていた場合は qos
に指定されている値の方が優先する。
MessageReader, MessageWriterのコンストラクタの引数に値を指定した場合は、 コンストラクタの引数の値が設定ファイルに記述した値に優先する。 パラメータの優先度を高いほうから順に並べると以下のようになる。
- コンストラクタの引数
- 設定ファイルのメッセージングシステム固有のパラメータ指定
- 設定ファイルのコンストラクタのデフォルト値
Python API クラス一覧
- sinetstream.MessageReader
- メッセージングシステムからメッセージを取得するクラス
- sinetstream.MessageWriter
- メッセージングシステムにメッセージを送信するクラス
- sinetstream.Message
- 送受信されるメッセージを表すクラス
- sinetstream.SinetError
- SINETStreamの例外クラス全体の親クラス
使用例
MessageReader
サービス名: service-1
のトピック topic-001
からメッセージを取得する例。
from sinetstream import MessageReader
reader = MessageReader('service-1', 'topic-001')
with reader as f:
for msg in f:
print(msg.value)
MessageReaderオブジェクトは with 文をサポートしている。
ブロックに入る際に暗黙的にメッセージングシステムとの接続を行い、
ブロックから抜け出す際にメッセージングシステムとの接続のクローズ処理を
行う。 またwith文のターゲットf
から取得できるイテレータを通してメッセージングシステムからメッセージを取得することができる。
デフォルトのパラメータではメッセージ取得の最大待ち時間は inf
に設定されているため、
通常の処理でfor
ループから抜け出すことはない。
for
ループから抜け出すことができるようにするにはMessageReaderコンストラクタの引数でメッセージの最大待ち時間の指定するか、
シグナル処理を行う必要がある。
MessageWriter
サービス名: service-2
のトピック topic-002
にメッセージを送る例。
from sinetstream import MessageWriter
writer = MessageWriter('service-2', 'topic-002')
with writer as f:
f.publish(b'test message 001')
f.publish(b'test message 002')
MessageWriterオブジェクトは with 文をサポートしている。 ブロックに入る際に暗黙的にメッセージングシステムとの接続を行い、 ブロックから抜け出す際にメッセージングシステムとの接続のクローズ処理を行う。
value_type
と value_serializer
のどちらも設定しない場合は publish() にメッセージの値として渡すパラメータはバイト列を指定する必要がある。
バイト列以外のオブジェクトを渡す場合は MessageWriter() のパラメータ value_type
でメッセージの種類を指定するか value_serializer
でメッセージの直列化を行う関数を指定する必要がある。
MessageReaderクラス
MessageReader()
MessageReaderクラスのコンストラクタ。
MessageReader(
service,
topics=None,
consistency=AT_MOST_ONCE,
client_id=DEFAULT_CLIENT_ID,
value_type=None,
value_deserializer=None,
receive_timeout_ms=float("inf"),
**kwargs)
パラメータ:
service
- サービス名
- 設定ファイルに対応するサービス名が記述されている必要がある
topics
- トピック名
str
またはlist
を指定できる- 複数のトピックをsubscribeする場合は
list
を指定すること - 指定を行わなかった場合は設定ファイルに記述されている値が用いられる
consistency
- AT_MOST_ONCE (=0)
- AT_LEAST_ONCE (=1)
- EXACTLY_ONCE (=2)
client_id
- クライアントの名前
- DEFAULT_CLIENT_ID, None, 空文字のいずれかが指定された場合はライブラリが値を自動生成する
- 自動生成した値は、このオブジェクトのプロパティとして取得できる
value_type
- メッセージの値の種類
- None(デフォルト値)が指定された場合は何もしない
value_deserializer
- メッセージのバイト列から値を復元するために使用する関数
- None(デフォルト値)が指定された場合は何もしない
receive_timeout_ms
- メッセージの到着を待つ最大時間(ms)
- 一度タイムアウトするとこのコネクションからメッセージを読み込むことはできない。
kwargs
- 各メッセージングシステムを操作する際に用いるオブジェクトに設定する個別のパラメータ
kwargs
に指定があった場合は、
メッセージングシステムを操作するための下位層のクラス kafka.KafkConsumer
などのコンストラクタにパラメータをそのまま渡す事ができる。
実際にどのようなパラメータを渡せるかは下位のライブラリに依存する。
このクラスでは指定されたパラメータの妥当性のチェックは行わず、そのままの形で下位のクラスに渡すこととする。
service
以外の引数は、設定ファイルにデフォルト値を記述することができる。
設定ファイルとコンストラクタの引数の両方に同じパラメータの値を指定した場合はコンストラクタの引数に指定した値が優先する。
制限事項: v0.9 ではコンストラクタ引数のデフォルト値を設定ファイルで指定する機能は未実装
制限事項: v0.9 ではKafkaのconsistency
にEXACTLY_ONCE
を指定してもAT_LEAST_ONCE
にダウングレードする
例外:
- NoServiceError
service
に指定した値に対応するサービスが設定ファイルに存在しない
- NoConfigError
- 設定ファイルが存在しない、あるいは読み込めない
- InvalidArgumentError
- 指定した引数の形式が正しくない。
consistency
の値が範囲外、topic
名として許容されない文字列などの場合
- 指定した引数の形式が正しくない。
- UnsupportedServiceTypeError
- 設定ファイルに指定されている
type
に対応するメッセージングシステムのプラグインがインストールされていない
- 設定ファイルに指定されている
パラメータ:
revice_timeout_ms
- メッセージの到着を待つ最大時間(ms)
- 一度タイムアウトするとこのコネクションからメッセージを読み込むことはでない。
戻り値:
メッセージングシステムとの接続状態を保持しているハンドラ
例:
reader = MessageReader('service-1', 'topic-001')
with reader as f:
for msg in f:
print(msg.value.decode('utf-8'))
例外:
- ConnectionError
- ブローカーへの接続がエラーになった
- AlreadyConnectedError
- 既に接続状態のオブジェクトに対して、再度 open() を呼び出した場合
プロパティ
コンストラクタの引数に指定した service
などの値は読み取りのみのプロパティとしてアクセスできる。
client_id
などのようにライブラリが値を自動生成するものについては、生成した値がプロパティとして取得できる。
読み取り位置の変更
メッセージングシステムによっては読み取り位置を変更できるものがある。 実際 Apache KafkaではConsumerがメッセージを読み取る位置を変更できる。
MessageReaderのサービスが Apache Kafka だった場合、読み取り位置の変更を指定することができる
- seek_to_beginning()
- seek_to_end()
例:
reader = MessageReader('service-1', 'topic-001')
with reader.open() as f:
f.seek_to_beginning()
for msg in f:
print(msg.value.decode('utf-8'))
開発中のデバッグ、テストを目的の非公開API。将来のリリースで削除する可能性がある。
MessageWriterクラス
MessageWriter()
MessageWriter(
service,
topic,
consistency=AT_MOST_ONCE,
client_id=DEFAULT_CLIENT_ID,
value_serializer=None,
**kwargs)
MessageWriterクラスのコンストラクタ。
パラメータ:
service
- サービス名
- 設定ファイルに対応するサービス名が記述されている必要がある
topic
- トピック名
- 指定を行わなかった場合は設定ファイルに記述されている値が用いられる
consistency
- AT_MOST_ONCE (=0)
- AT_LEAST_ONCE (=1)
- EXACTLY_ONCE (=2)
client_id
- クライアントの名前
- DEFAULT_CLIENT_ID, None, 空文字のいずれかが指定された場合はライブラリが値を自動生成する
value_type
- メッセージの値の種類
- None(デフォルト値)が指定された場合は何もしない
value_serializer
- メッセージの値をバイト列に変換するための関数
- None(デフォルト値)が指定された場合は何もしない
kwargs
- 各メッセージングシステムを操作する際に用いるオブジェクトに設定する個別のパラメータ
kwargs
に指定があった場合は、
メッセージングシステムを操作するための下位層のクラス kafka.KafkProducer
などのコンストラクタにパラメータをそのまま渡す事ができる。
実際にどのようなパラメータを渡せるかは下位のライブラリに依存する。
このクラスでは指定されたパラメータの妥当性のチェックは行わず、そのままの形で下位のクラスに渡すこととする。
service
以外の引数は、設定ファイルにデフォルト値を記述することができる。
設定ファイルとコンストラクタの引数の両方に同じパラメータの値を指定した場合はコンストラクタの引数に指定した値が優先する。
制限事項: v0.9 ではコンストラクタ引数のデフォルト値を設定ファイルで指定する機能は未実装
制限事項: v0.9 ではKafkaのconsistency
にEXACTLY_ONCE
を指定してもAT_LEAST_ONCE
にダウングレードする
例外:
- NoServiceError
service
に指定した値に対応するサービスが設定ファイルに存在しない
- NoConfigError
- 設定ファイルが存在しない、あるいは読み込めない
- InvalidArgumentError
- 指定した引数の形式が正しくない。
consistency
の値が範囲外、topic
名として許容されない文字列などの場合
- 指定した引数の形式が正しくない。
- UnsupportedServiceTypeError
- 設定ファイルに指定されている
type
に対応するメッセージングシステムのプラグインがインストールされていない
- 設定ファイルに指定されている
戻り値:
メッセージングシステムとの接続状態を保持しているハンドラ
例:
writer = MessageWriter('service-2', 'topic-002')
with writer as f:
f.publish(b'test message 001')
f.publish(b'test message 002')
例外:
- ConnectionError
- ブローカーへの接続がエラーになった
- AlreadyConnectedError
- 既に接続状態のオブジェクトに対して、再度 open() を呼び出した場合
プロパティ
コンストラクタの引数に指定した service
などの値は読み取りのみのプロパティとしてアクセスできる。
client_id
などのようにライブラリが値を自動生成するものについては、生成した値がプロパティとして取得できる。
Messageクラス
各メッセージングシステムのメッセージオブジェクトのラッパークラス。
プロパティ
全て読み取りアクセスのみ。
- value
- メッセージの値部分
- Kafka では raw.value, MQTT では raw.payload
- デフォルトではメッセージの値のバイト列が得られる
- MessageReaderに value_deserializer が設定されている場合は、その関数によってバイト列から変換されたオブジェクトが得られる
- topic
- トピック名
- raw
- 各メッセージングシステムのメッセージオブジェクト
例外一覧
例外 | 発生元メソッド | 理由 |
---|---|---|
NoServiceError |
MessageReader() , MessageWriter() |
指定したサービス名が設定ファイルで定義されていない。 |
UnsupportedServiceTypeError |
MessageReader() , MessageWriter() |
サービスの定義で指定されているサービスタイプをサポートしていない。または対応するプラグインがインストールされていない。 |
NoConfigError |
MessageReader() , MessageWriter() |
設定ファイルがない。 |
InvalidArgumentError |
MessageReader() , MessageWriter() , MqttReader.open() , MqttWriter.open() , MqttWriter.publish() |
引数が間違っている。 |
ConnectionError |
KafkaReader.open() , KafkaWriter.open() , MqttReader.open() , MqttWriter.open() , MqttWriter.publish() |
ブローカーとの接続に問題がある。 |
AlreadyConnectedError |
KafkaReader.open() , KafkaWriter.open() , MqttReader.open() , MqttWriter.open() |
すでにブローカと接続している。 |
依存関係にあるライブラリ
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for sinetstream-0.9.8b8-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | bb32886559a9f7e71608dbad788de07e70fb19e53912f62c8208f580479221ea |
|
MD5 | 7457047d808ab3ce9919861fc0f089cb |
|
BLAKE2b-256 | 61a460a92fca02d0017dceb2ea16f5c10bd87b4c90ce1c861dd1fe98014a3c0e |