event driven I/O for deities
Traffic provides kqueue(2) based event driven I/O for Python. However, in addition to listening for the I/O events, Traffic performs the corresponding I/O operation prior to propagating the notification. In essence, Traffic transforms can-read and can-write events into did-read and did-write.
Traffic a work in progress. The APIs may change without warning.
Traffic is designed to maximize efficiency by performing I/O operations in bulk. By grouping the collection of I/O events and the execution of I/O operations that work directly with pre-allocated buffer objects, Traffic is capable of performing a fair amount of work while the GIL is released. This batching of operations also reduces the number of times that the GIL is released and acquired within a process.
Traffic aims to be the I/O foundation for frameworks. Direct use of Traffic will require a fair amount of boilerplate.
Transferring Data with Traffic
Making a connection is easy:
import traffic.libkqueue import traffic.libloop s1, s2 = traffic.libkqueue.Octets.connect_stream_socketpair()
Traffic manages channels, a “Transit”, as distinct, half-duplex objects. The above example is referring to an invocation of socketpair(2), so the results, s1 and s2, are actually tuples of two traffic.libkqueue.Octets instances. The receive part is the first item and the send part, the second–I then O of I/O.
In order for these objects to perform actual transfers, they must be attached to a traffic.libkqueue.Traffic instance, which manages the kqueue subscriptions:
traffic = traffic.libkqueue.Traffic() s1read, s1write = s1 s2read, s2write = s2
Attachment is performed with the traffic.libkqueue.Traffic.acquire method. Once acquired, the Transit is owned by the Traffic object, and will not be released until termination:
traffic.acquire(s1read) traffic.acquire(s1write) traffic.acquire(s2read) traffic.acquire(s2write)
Now that the traffic instance has acquired the Transits, transfers are possible given resource availability: the Transits now need to acquire resources to transfer; some data to send or some space to receive into. Similar to Traffic, this is done with the acquire method, traffic.libkqueue.Octets.acquire:
mutable_buffer = traffic.libkqueue.Octets.rallocate(128) # memoryview(bytearray()) object s1read.acquire(mutable_buffer) s1write.acquire(b'Hello, that side of the world!')
Likewise the other end needs to acquire resources as well:
s2read.acquire(traffic.libkqueue.Octets.rallocate(128)) s2write.acquire(b'Nobody is home! Go Away!')
All of the Transits have the necessary resources for performing a transfer. For illustrative purposes, events will be placed into a Queue by a loop performed in a thread:
import queue import threading q = queue.Queue(6) deliver = q.put thread = threading.Thread(args = (deliver, traffic), target = traffic.libloop.loop) thread.start()
The loop is now performing transfers and delivering sequences of traffic.libloop.Activity instances using q.put. Reading the sequences with q.get, the transfers can be processed while the traffic.libloop.loop performs the next set of transfers for subsequent processing:
io = q.get() while io is not None: for x in io: if x.demand is not None: # It's a write Transit that has sent all its data. x.transit.terminate() else: print(x) if traffic.volume == 0: # all transits have terminated traffic.terminate() io = q.get() print("Complete!")
The output of the above should look something like:
Activity(transit=<traffic.libkqueue.Octets object at 0xDEADBEEF1>, termination=None, transferred=bytearray(b'Hello, that side of the world!'), demand=None) Activity(transit=<traffic.libkqueue.Octets object at 0xDEADBEEF2>, termination=None, transferred=bytearray(b'Nobody is home! Go Away!'), demand=None) Activity(transit=<traffic.libkqueue.Octets object at 0xDEADBEEF3>, termination=<Status>, transferred=None, demand=None) Activity(transit=<traffic.libkqueue.Octets object at 0xDEADBEEF4>, termination=<Status>, transferred=None, demand=None) Activity(transit=<traffic.libkqueue.Octets object at 0xDEADBEEF5>, termination=<Status>, transferred=None, demand=None) Activity(transit=<traffic.libkqueue.Octets object at 0xDEADBEEF6>, termination=<Status>, transferred=None, demand=None)
Naturally, the code in this example doesn’t really benefit from having the loop performed in a separate thread. However, the purpose of the example is to illuminate the anticipated form of use. In cases of numerous connections, the above model is extremely efficient as transfers are actually being performed concurrently with processing. The concurrency itself does not contrast much with traditional multi-threaded Python processes. Most Python interfaces to underlying C functions release the GIL and allow true concurrency with regards to the processing performed by the C function. The contrast is in the form of GIL contention and payload delivery: Traffic loops can perform multiple I/O operations without the GIL being held and deliver the data into an actual Python object.
The Usage chapter of the documentation covers the details of what is summarized here. There are important facets not directly addressed in this introduction. Notably, the use of buffer exhaustion for flow control.