Skip to main content

Python implementation of application support layer.

Project description

is-wire

PyPI Build Python suport Downloads

Pub/Sub middleware for the is architecture (python implementation)

Installation

Install the wire package using pip or pipenv:

  pip install --user is-wire
  # or
  pipenv install --user is-wire

Usage

Prepare environment

In order to send/receive messages an amqp broker is necessary, to create one simply run:

docker run -d --rm -p 5672:5672 -p 15672:15672 rabbitmq:3.7.6-management

Basic send/receive

Create a channel to connect to a broker, create a subscription and subscribe to desired topics to receive messages:

from is_wire.core import Channel, Subscription

# Connect to the broker
channel = Channel("amqp://guest:guest@localhost:5672")

# Subscribe to the desired topic(s)
subscription = Subscription(channel)
subscription.subscribe(topic="MyTopic.SubTopic")
# ... subscription.subscribe(topic="Other.Topic")

# Blocks forever waiting for one message from any subscription
message = channel.consume()
print(message)

Create and publish messages:

from is_wire.core import Channel, Message

# Connect to the broker
channel = Channel("amqp://guest:guest@localhost:5672")

message = Message()
# Body is a binary field therefore we need to encode the string
message.body = "Hello!".encode('latin1')

# Broadcast message to anyone interested (subscribed)
channel.publish(message, topic="MyTopic.SubTopic")

Serialize/Deserialize protobuf objects:

from is_wire.core import Channel, Message, Subscription, ContentType
from google.protobuf.struct_pb2 import Struct

channel = Channel("amqp://guest:guest@localhost:5672")

subscription = Subscription(channel)
subscription.subscribe(topic="MyTopic.SubTopic")

struct = Struct()
struct.fields["apples"].string_value = "red"

message = Message()
message.content_type = ContentType.JSON # or ContentType.PROTOBUF
message.pack(struct) # Serialize the struct into the message body

channel.publish(message, topic="MyTopic.SubTopic")

# Blocks forever waiting for the message we just sent
received_message = channel.consume()
# Deserialize the struct from the message body
received_struct = received_message.unpack(Struct) 

# Check that they are equal
assert struct == received_struct

Basic Request/Reply

Create a RPC Server:

from is_wire.core import Channel, StatusCode, Status
from is_wire.rpc import ServiceProvider, LogInterceptor
from google.protobuf.struct_pb2 import Struct
import time


def increment(struct, ctx):
    if struct.fields["value"].number_value < 0:
        # Return error to client
        return Status(StatusCode.INVALID_ARGUMENT, "Number must be positive")

    time.sleep(0.2)  # Simulate work
    struct.fields["value"].number_value += 1.0
    # Return normal reply
    return struct


channel = Channel("amqp://guest:guest@localhost:5672")

provider = ServiceProvider(channel)
logging = LogInterceptor()  # Log requests to console
provider.add_interceptor(logging)

provider.delegate(
    topic="MyService.Increment",
    function=increment,
    request_type=Struct,
    reply_type=Struct)

provider.run() # Blocks forever processing requests

Send a request to the RPC Server:

from is_wire.core import Channel, Message, Subscription
from google.protobuf.struct_pb2 import Struct
import socket

channel = Channel("amqp://guest:guest@localhost:5672")
subscription = Subscription(channel)

# Prepare request
struct = Struct()
struct.fields["value"].number_value = 1.0
request = Message(content=struct, reply_to=subscription)
# Make request
channel.publish(request, topic="MyService.Increment")

# Wait for reply with 1.0 seconds timeout
try:
    reply = channel.consume(timeout=1.0)
    struct = reply.unpack(Struct)
    print('RPC Status:', reply.status, '\nReply:', struct)
except socket.timeout:
    print('No reply :(')

Multiples requests can be done throughout same client. To distinguish which reply is related to each request, you can use the correlation_id. This attribute is always set when a Message is published containing reply_to parameter, which means that it was a RPC request. Example below shows how to deal with it.

from is_wire.core import Channel, Message, Subscription
from google.protobuf.struct_pb2 import Struct
import socket

channel = Channel("amqp://guest:guest@localhost:5672")
subscription = Subscription(channel)

# Prepare first request
struct = Struct()
struct.fields["value"].number_value = 1.0
request_1 = Message(content=struct, reply_to=subscription)

# Prepare second request
struct = Struct()
struct.fields["value"].number_value = 2.0
request_2 = Message(content=struct, reply_to=subscription)

# Make requests
channel.publish(request_1, topic="MyService.Increment")
channel.publish(request_2, topic="MyService.Increment")

# Wait for replies with 1.0 seconds timeout
n_replies = 0
while n_replies < 2:
    try:
        reply = channel.consume(timeout=1.0)
        struct = reply.unpack(Struct)
        if reply.correlation_id == request_1.correlation_id:
            n_replies += 1
            print('First Request\nRPC Status:', reply.status, '\nReply:', struct)
        elif reply.correlation_id == request_2.correlation_id:
            n_replies += 1
            print('Second Request\nRPC Status:', reply.status, '\nReply:', struct)
        else:
            print('Unexpected message')
    except socket.timeout:
        print('No reply :(')

Tracing messages

This middleware uses opencensus as instrumentation library. Latest versions of opencensus released separate packages to integrate with different frameworks and tracing collector tools. When interacting with services implemented with either the C++ or Python of is-wire, we recommend to use Zipkin to collect the tracing data. To do so, use the latest version of OpenCensus Zipkin Exporter.

Instantiate an Exporter to trace requests:

from is_wire.core import AsyncTransport
from opencensus.ext.zipkin.trace_exporter import ZipkinExporter

# Create an exporter, change values accordingly to match your zipkin server
exporter = ZipkinExporter(
    service_name="MyService",
    host_name="localhost",
    port=9411,
    transport=AsyncTransport,
)

Then create a tracer and start tracing:

from is_wire.core import Channel, Message, Tracer

channel = Channel("amqp://guest:guest@localhost:5672") 
tracer = Tracer(exporter)

with tracer.span(name="publish") as span:
    message = Message()
    # ...
    # Propagates the current tracing context
    message.inject_tracing(span) 
    channel.publish(message, topic="Any.Topic")

Or create a tracing interceptor and pass it to your ServiceProvider:

from is_wire.rpc import TracingInterceptor, ServiceProvider

channel = Channel("amqp://guest:guest@localhost:5672") 

provider = ServiceProvider(channel)

tracing = TracingInterceptor(exporter)  # automatically trace requests
provider.add_interceptor(tracing)

Development

Tests

# prepare environment
pip install --user tox
docker run -d --rm -p 5672:5672 -p 15672:15672 rabbitmq:3.7.6-management

# run all the tests
tox

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

is_wire-1.2.1.tar.gz (17.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

is_wire-1.2.1-py3-none-any.whl (19.8 kB view details)

Uploaded Python 3

File details

Details for the file is_wire-1.2.1.tar.gz.

File metadata

  • Download URL: is_wire-1.2.1.tar.gz
  • Upload date:
  • Size: 17.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.11.3

File hashes

Hashes for is_wire-1.2.1.tar.gz
Algorithm Hash digest
SHA256 7f2930fe4f9d101db68a609b6992ed856e82083a8d418c521987dad087585346
MD5 1fc99a33f586b885ea4f9d2202ab05ab
BLAKE2b-256 9a32ff30ac1cb2ffe8573ce52c779688a310ad884fce274b0b1f5ed8ecad1f17

See more details on using hashes here.

File details

Details for the file is_wire-1.2.1-py3-none-any.whl.

File metadata

  • Download URL: is_wire-1.2.1-py3-none-any.whl
  • Upload date:
  • Size: 19.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.11.3

File hashes

Hashes for is_wire-1.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 ce32caa600d8ee7569e6e38b25097c49fa41d37949134439dbd0c28955966f1b
MD5 f0633a15a8df1d8e221909ca0851d009
BLAKE2b-256 0c5d4967f07f45b882b898ba2d78a427089c5eb4e202c111cc98851b8c465aa5

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page