Default utilities for the dareplane platform
Project description
Dareplane Python Utils
This module includes utilities for python which are used within the dareplane framework. It contains functionality which shared can be reused within multiple modules. This currently includes:
- A
DefaultServer- which will be loaded an extended within each module to implement the dareplane API logging- which contains the standard formatting and a SocketHandler which is modified to sendjsonrepresentations of the logging records to the default logging server port (9020). This is used to enable cross process logging.- A
StreamWatcherimplementation - which is a utility class to query a single LSL stream into a ring buffer.
Default Dareplane Server
This default server is used by all Dareplane python modules as a starting
point for their TCP socket. The idea is to have a single source for common
functionality and patch everything that is model specific on top of this
Functional incarnations
Currently we are faced with two functional incarnations of servers
- Spawning functionality from the server in a separate thread, being linked via events to the main thread (usually the server).
- Spawning a subprocess for running functionality - Currently necessary for running
psychopyas it cannot be run from outside the main thread.
Logging
The logging tools allow two main entry point, which are from dareplane_utils.logging.logger import get_logger, which is used to get a logger with the default configuration and from dareplane_utils.logging.server import LogRecordSocketReceiver which is used to spawn up a server for consolidating logs of different processes.
StreamWatcher
StreamWatcher are a convenient utility around LSL stream inlets. They are basically a ring buffer for reading data to a numpy array. StreamWatchers are:
- initialized with a target stream name and a buffer size in seconds specified by
buffer_size - connected to the target LSL stream
- updated to fetch the latest data (usually done in a loop)
initialize a StreamWatcher
from dareplane_utils.stream_watcher.lsl_stream_watcher import StreamWatcher
STREAM_NAME = "my_stream"
BUFFER_SIZE_S = 5 # the required buffer size will be calculated from the LSL
# streams meta data
sw = StreamWatcher(
STREAM_NAME,
buffer_size_s=BUFFER_SIZE_S,
)
connect to the stream
# Either use the self.name or a provided identifier dict to hook up to an LSL stream
sw.connect_to_stream()
update
sw.update()
Update will call the following method:
def update(self):
"""Look for new data and update the buffer"""
samples, times = self.inlet.pull_chunk()
self.add_samples(samples, times)
self.samples = samples
self.n_new += len(samples)
Getting data
To get the data from the StreamWatcher you can either grab the full ring buffer from the instance attributes
sw.buffer # ring buffer for data
sw.buffer_t # ring buffer for time stamps
sw.curr_i # current position of the head in the ring buffer
or you usually want the more convenient way by using the unfold_buffer method,
which returns a chronologically sorted array ([-1] is the most recent data
point and [0] is the oldest data point).
sw.unfold_buffer() # sorted data
sw.unfold_buffer_t() # sorted time stamps
## The above is using the following implementation
def unfold_buffer(self):
return np.vstack(
[self.buffer[self.curr_i :], self.buffer[: self.curr_i]]
)
Event Loop
A class that implements a custom event loop with precise timing.
The EventLoop uses dareplane_utils.general.time.sleep_s for more precise sleep timing at the expense of CPU usage.
Callbacks are the means of interacting with the event loop. There are two types of callbacks:
- Periodic callbacks: These are executed at regular intervals.
- One-time callbacks: These are executed once and then removed from the list of callbacks. One-time callback can furthermore be scheduled to run at a specific time in the future.
Callbacks can be any callable function, which gets one and only one argument, which is a context object, that can be of type any. This ensures that any type of input can be implemented.
def no_arg_callback():
print("Running with no args")
evloop = EventLoop(dt_s=0.1) # process callbacks every 100ms
# for a callback with no args we use lambda to blank the callback arg
evloop.add_callback_once(lambda ctx: no_arg_callback())
TODO
- channel names are only initialized on connection
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dareplane_utils-0.0.18.tar.gz.
File metadata
- Download URL: dareplane_utils-0.0.18.tar.gz
- Upload date:
- Size: 30.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3b8a613ccc278b2c5b0947ec2a1a2de31c34cf8b5712d055765b3f17d2174271
|
|
| MD5 |
3d48983a3b4a58c2c982fdd45e5ed981
|
|
| BLAKE2b-256 |
649690ba34101c6fb996d04135ac05abc715fa136a0204dc913cc154e5f54cc0
|
File details
Details for the file dareplane_utils-0.0.18-py3-none-any.whl.
File metadata
- Download URL: dareplane_utils-0.0.18-py3-none-any.whl
- Upload date:
- Size: 26.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d6a5ec8bed3327fb23d6375846c9646bf178d759c0de42c95a4622e9027b021b
|
|
| MD5 |
e16fd76a42fcf562d4dd8474736500b9
|
|
| BLAKE2b-256 |
a8f59846148d4e36019e17e086b2a40974eb6d48495c6e225b931ff2aa5f5431
|